Skip to content

Commit 2bcdf8c

Browse files
committed
[SPARK-7440][SQL] Remove physical Distinct operator in favor of Aggregate
This patch replaces Distinct with Aggregate in the optimizer, so Distinct will become more efficient over time as we optimize Aggregate (via Tungsten). Author: Reynold Xin <[email protected]> Closes apache#6637 from rxin/replace-distinct and squashes the following commits: b3cc50e [Reynold Xin] Mima excludes. 93d6117 [Reynold Xin] Code review feedback. 87e4741 [Reynold Xin] [SPARK-7440][SQL] Remove physical Distinct operator in favor of Aggregate.
1 parent 6593842 commit 2bcdf8c

File tree

7 files changed

+65
-35
lines changed

7 files changed

+65
-35
lines changed

project/MimaExcludes.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,9 @@ object MimaExcludes {
4646
"org.apache.spark.api.java.JavaRDDLike.partitioner"),
4747
// Mima false positive (was a private[spark] class)
4848
ProblemFilters.exclude[MissingClassProblem](
49-
"org.apache.spark.util.collection.PairIterator")
49+
"org.apache.spark.util.collection.PairIterator"),
50+
// SQL execution is considered private.
51+
excludePackage("org.apache.spark.sql.execution")
5052
)
5153
case v if v.startsWith("1.4") =>
5254
Seq(

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ object DefaultOptimizer extends Optimizer {
3636
// SubQueries are only needed for analysis and can be removed before execution.
3737
Batch("Remove SubQueries", FixedPoint(100),
3838
EliminateSubQueries) ::
39+
Batch("Distinct", FixedPoint(100),
40+
ReplaceDistinctWithAggregate) ::
3941
Batch("Operator Reordering", FixedPoint(100),
4042
UnionPushdown,
4143
CombineFilters,
@@ -696,3 +698,15 @@ object ConvertToLocalRelation extends Rule[LogicalPlan] {
696698
LocalRelation(projectList.map(_.toAttribute), data.map(projection))
697699
}
698700
}
701+
702+
/**
703+
* Replaces logical [[Distinct]] operator with an [[Aggregate]] operator.
704+
* {{{
705+
* SELECT DISTINCT f1, f2 FROM t ==> SELECT f1, f2 FROM t GROUP BY f1, f2
706+
* }}}
707+
*/
708+
object ReplaceDistinctWithAggregate extends Rule[LogicalPlan] {
709+
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
710+
case Distinct(child) => Aggregate(child.output, child.output, child)
711+
}
712+
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,9 @@ case class Sample(
339339
override def output: Seq[Attribute] = child.output
340340
}
341341

342+
/**
343+
* Returns a new logical plan that dedups input rows.
344+
*/
342345
case class Distinct(child: LogicalPlan) extends UnaryNode {
343346
override def output: Seq[Attribute] = child.output
344347
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.optimizer
19+
20+
import org.apache.spark.sql.catalyst.dsl.plans._
21+
import org.apache.spark.sql.catalyst.dsl.expressions._
22+
import org.apache.spark.sql.catalyst.plans.PlanTest
23+
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Distinct, LocalRelation, LogicalPlan}
24+
import org.apache.spark.sql.catalyst.rules.RuleExecutor
25+
26+
class ReplaceDistinctWithAggregateSuite extends PlanTest {
27+
28+
object Optimize extends RuleExecutor[LogicalPlan] {
29+
val batches = Batch("ProjectCollapsing", Once, ReplaceDistinctWithAggregate) :: Nil
30+
}
31+
32+
test("replace distinct with aggregate") {
33+
val input = LocalRelation('a.int, 'b.int)
34+
35+
val query = Distinct(input)
36+
val optimized = Optimize.execute(query.analyze)
37+
38+
val correctAnswer = Aggregate(input.output, input.output, input)
39+
40+
comparePlans(optimized, correctAnswer)
41+
}
42+
}

sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1311,7 +1311,7 @@ class DataFrame private[sql](
13111311
* @group dfops
13121312
* @since 1.3.0
13131313
*/
1314-
override def distinct: DataFrame = Distinct(logicalPlan)
1314+
override def distinct: DataFrame = dropDuplicates()
13151315

13161316
/**
13171317
* @group basic

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -284,8 +284,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
284284
case r: RunnableCommand => ExecutedCommand(r) :: Nil
285285

286286
case logical.Distinct(child) =>
287-
execution.Distinct(partial = false,
288-
execution.Distinct(partial = true, planLater(child))) :: Nil
287+
throw new IllegalStateException(
288+
"logical distinct operator should have been replaced by aggregate in the optimizer")
289289
case logical.Repartition(numPartitions, shuffle, child) =>
290290
execution.Repartition(numPartitions, shuffle, planLater(child)) :: Nil
291291
case logical.SortPartitions(sortExprs, child) =>

sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala

Lines changed: 0 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -230,37 +230,6 @@ case class ExternalSort(
230230
override def outputOrdering: Seq[SortOrder] = sortOrder
231231
}
232232

233-
/**
234-
* :: DeveloperApi ::
235-
* Computes the set of distinct input rows using a HashSet.
236-
* @param partial when true the distinct operation is performed partially, per partition, without
237-
* shuffling the data.
238-
* @param child the input query plan.
239-
*/
240-
@DeveloperApi
241-
case class Distinct(partial: Boolean, child: SparkPlan) extends UnaryNode {
242-
override def output: Seq[Attribute] = child.output
243-
244-
override def requiredChildDistribution: Seq[Distribution] =
245-
if (partial) UnspecifiedDistribution :: Nil else ClusteredDistribution(child.output) :: Nil
246-
247-
protected override def doExecute(): RDD[Row] = {
248-
child.execute().mapPartitions { iter =>
249-
val hashSet = new scala.collection.mutable.HashSet[Row]()
250-
251-
var currentRow: Row = null
252-
while (iter.hasNext) {
253-
currentRow = iter.next()
254-
if (!hashSet.contains(currentRow)) {
255-
hashSet.add(currentRow.copy())
256-
}
257-
}
258-
259-
hashSet.iterator
260-
}
261-
}
262-
}
263-
264233
/**
265234
* :: DeveloperApi ::
266235
* Return a new RDD that has exactly `numPartitions` partitions.

0 commit comments

Comments
 (0)