Skip to content

Commit 3dc9ae2

Browse files
Davies Liudavies
authored andcommitted
[SPARK-13523] [SQL] Reuse exchanges in a query
## What changes were proposed in this pull request? It’s possible to have common parts in a query, for example, self join, it will be good to avoid the duplicated part to same CPUs and memory (Broadcast or cache). Exchange will materialize the underlying RDD by shuffle or collect, it’s a great point to check duplicates and reuse them. Duplicated exchanges means they generate exactly the same result inside a query. In order to find out the duplicated exchanges, we should be able to compare SparkPlan to check that they have same results or not. We already have that for LogicalPlan, so we should move that into QueryPlan to make it available for SparkPlan. Once we can find the duplicated exchanges, we should replace all of them with same SparkPlan object (could be wrapped by ReusedExchage for explain), then the plan tree become a DAG. Since all the planner only work with tree, so this rule should be the last one for the entire planning. After the rule, the plan will looks like: ``` WholeStageCodegen : +- Project [id#0L] : +- BroadcastHashJoin [id#0L], [id#2L], Inner, BuildRight, None : :- Project [id#0L] : : +- BroadcastHashJoin [id#0L], [id#1L], Inner, BuildRight, None : : :- Range 0, 1, 4, 1024, [id#0L] : : +- INPUT : +- INPUT :- BroadcastExchange HashedRelationBroadcastMode(true,List(id#1L),List(id#1L)) : +- WholeStageCodegen : : +- Range 0, 1, 4, 1024, [id#1L] +- ReusedExchange [id#2L], BroadcastExchange HashedRelationBroadcastMode(true,List(id#1L),List(id#1L)) ``` ![bjoin](https://cloud.githubusercontent.com/assets/40902/13414787/209e8c5c-df0a-11e5-8a0f-edff69d89e83.png) For three ways SortMergeJoin, ``` == Physical Plan == WholeStageCodegen : +- Project [id#0L] : +- SortMergeJoin [id#0L], [id#4L], None : :- INPUT : +- INPUT :- WholeStageCodegen : : +- Project [id#0L] : : +- SortMergeJoin [id#0L], [id#3L], None : : :- INPUT : : +- INPUT : :- WholeStageCodegen : : : +- Sort [id#0L ASC], false, 0 : : : +- INPUT : : +- Exchange hashpartitioning(id#0L, 200), None : : +- WholeStageCodegen : : : +- Range 0, 1, 4, 33554432, [id#0L] : +- WholeStageCodegen : : +- Sort [id#3L ASC], false, 0 : : +- INPUT : +- ReusedExchange [id#3L], Exchange hashpartitioning(id#0L, 200), None +- WholeStageCodegen : +- Sort [id#4L ASC], false, 0 : +- INPUT +- ReusedExchange [id#4L], Exchange hashpartitioning(id#0L, 200), None ``` ![sjoin](https://cloud.githubusercontent.com/assets/40902/13414790/27aea61c-df0a-11e5-8cbf-fbc985c31d95.png) If the same ShuffleExchange or BroadcastExchange, execute()/executeBroadcast() will be called by different parents, they should cached the RDD/Broadcast, return the same one for all the parents. ## How was this patch tested? Added some unit tests for this. Had done some manual tests on TPCDS query Q59 and Q64, we can see some exchanges are re-used (this requires a change in PhysicalRDD to for sameResult, is be done in #11514 ). Author: Davies Liu <[email protected]> Closes #11403 from davies/dedup.
1 parent 0dd0648 commit 3dc9ae2

File tree

16 files changed

+403
-90
lines changed

16 files changed

+403
-90
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala

Lines changed: 62 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import org.apache.spark.sql.catalyst.expressions._
2121
import org.apache.spark.sql.catalyst.trees.TreeNode
2222
import org.apache.spark.sql.types.{DataType, StructType}
2323

24-
abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanType] {
24+
abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanType] {
2525
self: PlanType =>
2626

2727
def output: Seq[Attribute]
@@ -237,4 +237,65 @@ abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanTy
237237
}
238238

239239
override def innerChildren: Seq[PlanType] = subqueries
240+
241+
/**
242+
* Canonicalized copy of this query plan.
243+
*/
244+
protected lazy val canonicalized: PlanType = this
245+
246+
/**
247+
* Returns true when the given query plan will return the same results as this query plan.
248+
*
249+
* Since its likely undecidable to generally determine if two given plans will produce the same
250+
* results, it is okay for this function to return false, even if the results are actually
251+
* the same. Such behavior will not affect correctness, only the application of performance
252+
* enhancements like caching. However, it is not acceptable to return true if the results could
253+
* possibly be different.
254+
*
255+
* By default this function performs a modified version of equality that is tolerant of cosmetic
256+
* differences like attribute naming and or expression id differences. Operators that
257+
* can do better should override this function.
258+
*/
259+
def sameResult(plan: PlanType): Boolean = {
260+
val canonicalizedLeft = this.canonicalized
261+
val canonicalizedRight = plan.canonicalized
262+
canonicalizedLeft.getClass == canonicalizedRight.getClass &&
263+
canonicalizedLeft.children.size == canonicalizedRight.children.size &&
264+
canonicalizedLeft.cleanArgs == canonicalizedRight.cleanArgs &&
265+
(canonicalizedLeft.children, canonicalizedRight.children).zipped.forall(_ sameResult _)
266+
}
267+
268+
/**
269+
* All the attributes that are used for this plan.
270+
*/
271+
lazy val allAttributes: Seq[Attribute] = children.flatMap(_.output)
272+
273+
private def cleanExpression(e: Expression): Expression = e match {
274+
case a: Alias =>
275+
// As the root of the expression, Alias will always take an arbitrary exprId, we need
276+
// to erase that for equality testing.
277+
val cleanedExprId =
278+
Alias(a.child, a.name)(ExprId(-1), a.qualifiers, isGenerated = a.isGenerated)
279+
BindReferences.bindReference(cleanedExprId, allAttributes, allowFailures = true)
280+
case other =>
281+
BindReferences.bindReference(other, allAttributes, allowFailures = true)
282+
}
283+
284+
/** Args that have cleaned such that differences in expression id should not affect equality */
285+
protected lazy val cleanArgs: Seq[Any] = {
286+
def cleanArg(arg: Any): Any = arg match {
287+
case e: Expression => cleanExpression(e).canonicalized
288+
case other => other
289+
}
290+
291+
productIterator.map {
292+
// Children are checked using sameResult above.
293+
case tn: TreeNode[_] if containsChild(tn) => null
294+
case e: Expression => cleanArg(e)
295+
case s: Option[_] => s.map(cleanArg)
296+
case s: Seq[_] => s.map(cleanArg)
297+
case m: Map[_, _] => m.mapValues(cleanArg)
298+
case other => other
299+
}.toSeq
300+
}
240301
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala

Lines changed: 1 addition & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -114,60 +114,7 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
114114
*/
115115
def childrenResolved: Boolean = children.forall(_.resolved)
116116

117-
/**
118-
* Returns true when the given logical plan will return the same results as this logical plan.
119-
*
120-
* Since its likely undecidable to generally determine if two given plans will produce the same
121-
* results, it is okay for this function to return false, even if the results are actually
122-
* the same. Such behavior will not affect correctness, only the application of performance
123-
* enhancements like caching. However, it is not acceptable to return true if the results could
124-
* possibly be different.
125-
*
126-
* By default this function performs a modified version of equality that is tolerant of cosmetic
127-
* differences like attribute naming and or expression id differences. Logical operators that
128-
* can do better should override this function.
129-
*/
130-
def sameResult(plan: LogicalPlan): Boolean = {
131-
val cleanLeft = EliminateSubqueryAliases(this)
132-
val cleanRight = EliminateSubqueryAliases(plan)
133-
134-
cleanLeft.getClass == cleanRight.getClass &&
135-
cleanLeft.children.size == cleanRight.children.size && {
136-
logDebug(
137-
s"[${cleanRight.cleanArgs.mkString(", ")}] == [${cleanLeft.cleanArgs.mkString(", ")}]")
138-
cleanRight.cleanArgs == cleanLeft.cleanArgs
139-
} &&
140-
(cleanLeft.children, cleanRight.children).zipped.forall(_ sameResult _)
141-
}
142-
143-
/** Args that have cleaned such that differences in expression id should not affect equality */
144-
protected lazy val cleanArgs: Seq[Any] = {
145-
val input = children.flatMap(_.output)
146-
def cleanExpression(e: Expression) = e match {
147-
case a: Alias =>
148-
// As the root of the expression, Alias will always take an arbitrary exprId, we need
149-
// to erase that for equality testing.
150-
val cleanedExprId =
151-
Alias(a.child, a.name)(ExprId(-1), a.qualifiers, isGenerated = a.isGenerated)
152-
BindReferences.bindReference(cleanedExprId, input, allowFailures = true)
153-
case other => BindReferences.bindReference(other, input, allowFailures = true)
154-
}
155-
156-
productIterator.map {
157-
// Children are checked using sameResult above.
158-
case tn: TreeNode[_] if containsChild(tn) => null
159-
case e: Expression => cleanExpression(e)
160-
case s: Option[_] => s.map {
161-
case e: Expression => cleanExpression(e)
162-
case other => other
163-
}
164-
case s: Seq[_] => s.map {
165-
case e: Expression => cleanExpression(e)
166-
case other => other
167-
}
168-
case other => other
169-
}.toSeq
170-
}
117+
override lazy val canonicalized: LogicalPlan = EliminateSubqueryAliases(this)
171118

172119
/**
173120
* Optionally resolves the given strings to a [[NamedExpression]] using the input from all child

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/broadcastMode.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,11 @@ import org.apache.spark.sql.catalyst.InternalRow
2525
*/
2626
trait BroadcastMode {
2727
def transform(rows: Array[InternalRow]): Any
28+
29+
/**
30+
* Returns true iff this [[BroadcastMode]] generates the same result as `other`.
31+
*/
32+
def compatibleWith(other: BroadcastMode): Boolean
2833
}
2934

3035
/**
@@ -33,4 +38,8 @@ trait BroadcastMode {
3338
case object IdentityBroadcastMode extends BroadcastMode {
3439
// TODO: pack the UnsafeRows into single bytes array.
3540
override def transform(rows: Array[InternalRow]): Array[InternalRow] = rows
41+
42+
override def compatibleWith(other: BroadcastMode): Boolean = {
43+
this eq other
44+
}
3645
}

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.sql.execution
1919

2020
import org.apache.spark.annotation.DeveloperApi
21+
import org.apache.spark.sql.execution.exchange.ReusedExchange
2122
import org.apache.spark.sql.execution.metric.SQLMetricInfo
2223
import org.apache.spark.util.Utils
2324

@@ -31,13 +32,28 @@ class SparkPlanInfo(
3132
val simpleString: String,
3233
val children: Seq[SparkPlanInfo],
3334
val metadata: Map[String, String],
34-
val metrics: Seq[SQLMetricInfo])
35+
val metrics: Seq[SQLMetricInfo]) {
36+
37+
override def hashCode(): Int = {
38+
// hashCode of simpleString should be good enough to distinguish the plans from each other
39+
// within a plan
40+
simpleString.hashCode
41+
}
42+
43+
override def equals(other: Any): Boolean = other match {
44+
case o: SparkPlanInfo =>
45+
nodeName == o.nodeName && simpleString == o.simpleString && children == o.children
46+
case _ => false
47+
}
48+
}
3549

3650
private[sql] object SparkPlanInfo {
3751

3852
def fromSparkPlan(plan: SparkPlan): SparkPlanInfo = {
39-
40-
val children = plan.children ++ plan.subqueries
53+
val children = plan match {
54+
case ReusedExchange(_, child) => child :: Nil
55+
case _ => plan.children ++ plan.subqueries
56+
}
4157
val metrics = plan.metrics.toSeq.map { case (key, metric) =>
4258
new SQLMetricInfo(metric.name.getOrElse(key), metric.id,
4359
Utils.getFormattedClassName(metric.param))

sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@ case class TungstenAggregate(
4646

4747
require(TungstenAggregate.supportsAggregate(aggregateBufferAttributes))
4848

49+
override lazy val allAttributes: Seq[Attribute] =
50+
child.output ++ aggregateBufferAttributes ++ aggregateAttributes ++
51+
aggregateExpressions.flatMap(_.aggregateFunction.inputAggBufferAttributes)
52+
4953
override private[sql] lazy val metrics = Map(
5054
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"),
5155
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),

sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,9 @@ case class Range(
166166
private[sql] override lazy val metrics = Map(
167167
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
168168

169+
// output attributes should not affect the results
170+
override lazy val cleanArgs: Seq[Any] = Seq(start, step, numSlices, numElements)
171+
169172
override def upstreams(): Seq[RDD[InternalRow]] = {
170173
sqlContext.sparkContext.parallelize(0 until numSlices, numSlices)
171174
.map(i => InternalRow(i)) :: Nil

sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,16 @@ import org.apache.spark.util.ThreadUtils
3434
*/
3535
case class BroadcastExchange(
3636
mode: BroadcastMode,
37-
child: SparkPlan) extends UnaryNode {
38-
39-
override def output: Seq[Attribute] = child.output
37+
child: SparkPlan) extends Exchange {
4038

4139
override def outputPartitioning: Partitioning = BroadcastPartitioning(mode)
4240

41+
override def sameResult(plan: SparkPlan): Boolean = plan match {
42+
case p: BroadcastExchange =>
43+
mode.compatibleWith(p.mode) && child.sameResult(p.child)
44+
case _ => false
45+
}
46+
4347
@transient
4448
private val timeout: Duration = {
4549
val timeoutValue = sqlContext.conf.broadcastTimeout
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.exchange
19+
20+
import scala.collection.mutable
21+
import scala.collection.mutable.ArrayBuffer
22+
23+
import org.apache.spark.broadcast
24+
import org.apache.spark.rdd.RDD
25+
import org.apache.spark.sql.SQLContext
26+
import org.apache.spark.sql.catalyst.InternalRow
27+
import org.apache.spark.sql.catalyst.expressions.Attribute
28+
import org.apache.spark.sql.catalyst.rules.Rule
29+
import org.apache.spark.sql.execution.{LeafNode, SparkPlan, UnaryNode}
30+
import org.apache.spark.sql.types.StructType
31+
32+
/**
33+
* An interface for exchanges.
34+
*/
35+
abstract class Exchange extends UnaryNode {
36+
override def output: Seq[Attribute] = child.output
37+
}
38+
39+
/**
40+
* A wrapper for reused exchange to have different output, because two exchanges which produce
41+
* logically identical output will have distinct sets of output attribute ids, so we need to
42+
* preserve the original ids because they're what downstream operators are expecting.
43+
*/
44+
case class ReusedExchange(override val output: Seq[Attribute], child: Exchange) extends LeafNode {
45+
46+
override def sameResult(plan: SparkPlan): Boolean = {
47+
// Ignore this wrapper. `plan` could also be a ReusedExchange, so we reverse the order here.
48+
plan.sameResult(child)
49+
}
50+
51+
def doExecute(): RDD[InternalRow] = {
52+
child.execute()
53+
}
54+
55+
override protected[sql] def doExecuteBroadcast[T](): broadcast.Broadcast[T] = {
56+
child.executeBroadcast()
57+
}
58+
59+
// Do not repeat the same tree in explain.
60+
override def treeChildren: Seq[SparkPlan] = Nil
61+
}
62+
63+
/**
64+
* Find out duplicated exchanges in the spark plan, then use the same exchange for all the
65+
* references.
66+
*/
67+
private[sql] case class ReuseExchange(sqlContext: SQLContext) extends Rule[SparkPlan] {
68+
69+
def apply(plan: SparkPlan): SparkPlan = {
70+
if (!sqlContext.conf.exchangeReuseEnabled) {
71+
return plan
72+
}
73+
// Build a hash map using schema of exchanges to avoid O(N*N) sameResult calls.
74+
val exchanges = mutable.HashMap[StructType, ArrayBuffer[Exchange]]()
75+
plan.transformUp {
76+
case exchange: Exchange =>
77+
// the exchanges that have same results usually also have same schemas (same column names).
78+
val sameSchema = exchanges.getOrElseUpdate(exchange.schema, ArrayBuffer[Exchange]())
79+
val samePlan = sameSchema.find { e =>
80+
exchange.sameResult(e)
81+
}
82+
if (samePlan.isDefined) {
83+
// Keep the output of this exchange, the following plans require that to resolve
84+
// attributes.
85+
ReusedExchange(exchange.output, samePlan.get)
86+
} else {
87+
sameSchema += exchange
88+
exchange
89+
}
90+
}
91+
}
92+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ import org.apache.spark.util.MutablePair
3838
case class ShuffleExchange(
3939
var newPartitioning: Partitioning,
4040
child: SparkPlan,
41-
@transient coordinator: Option[ExchangeCoordinator]) extends UnaryNode {
41+
@transient coordinator: Option[ExchangeCoordinator]) extends Exchange {
4242

4343
override def nodeName: String = {
4444
val extraInfo = coordinator match {
@@ -55,8 +55,6 @@ case class ShuffleExchange(
5555

5656
override def outputPartitioning: Partitioning = newPartitioning
5757

58-
override def output: Seq[Attribute] = child.output
59-
6058
private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)
6159

6260
override protected def doPrepare(): Unit = {
@@ -103,16 +101,25 @@ case class ShuffleExchange(
103101
new ShuffledRowRDD(shuffleDependency, specifiedPartitionStartIndices)
104102
}
105103

104+
/**
105+
* Caches the created ShuffleRowRDD so we can reuse that.
106+
*/
107+
private var cachedShuffleRDD: ShuffledRowRDD = null
108+
106109
protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") {
107-
coordinator match {
108-
case Some(exchangeCoordinator) =>
109-
val shuffleRDD = exchangeCoordinator.postShuffleRDD(this)
110-
assert(shuffleRDD.partitions.length == newPartitioning.numPartitions)
111-
shuffleRDD
112-
case None =>
113-
val shuffleDependency = prepareShuffleDependency()
114-
preparePostShuffleRDD(shuffleDependency)
110+
// Returns the same ShuffleRowRDD if this plan is used by multiple plans.
111+
if (cachedShuffleRDD == null) {
112+
cachedShuffleRDD = coordinator match {
113+
case Some(exchangeCoordinator) =>
114+
val shuffleRDD = exchangeCoordinator.postShuffleRDD(this)
115+
assert(shuffleRDD.partitions.length == newPartitioning.numPartitions)
116+
shuffleRDD
117+
case None =>
118+
val shuffleDependency = prepareShuffleDependency()
119+
preparePostShuffleRDD(shuffleDependency)
120+
}
115121
}
122+
cachedShuffleRDD
116123
}
117124
}
118125

0 commit comments

Comments
 (0)