Skip to content

Commit 7df43ca

Browse files
author
Davies Liu
committed
address comments
1 parent 679d669 commit 7df43ca

File tree

7 files changed

+18
-15
lines changed

7 files changed

+18
-15
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -239,9 +239,9 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
239239
override def innerChildren: Seq[PlanType] = subqueries
240240

241241
/**
242-
* Cleaned copy of this query plan.
242+
* Canonicalized copy of this query plan.
243243
*/
244-
protected lazy val cleaned: PlanType = this
244+
protected lazy val canonicalized: PlanType = this
245245

246246
/**
247247
* Returns true when the given query plan will return the same results as this query plan.
@@ -257,8 +257,8 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
257257
* can do better should override this function.
258258
*/
259259
def sameResult(plan: PlanType): Boolean = {
260-
val cleanLeft = this.cleaned
261-
val cleanRight = plan.cleaned
260+
val cleanLeft = this.canonicalized
261+
val cleanRight = plan.canonicalized
262262
cleanLeft.getClass == cleanRight.getClass &&
263263
cleanLeft.children.size == cleanRight.children.size &&
264264
cleanLeft.cleanArgs == cleanRight.cleanArgs &&

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
114114
*/
115115
def childrenResolved: Boolean = children.forall(_.resolved)
116116

117-
override lazy val cleaned: LogicalPlan = EliminateSubqueryAliases(this)
117+
override lazy val canonicalized: LogicalPlan = EliminateSubqueryAliases(this)
118118

119119
/**
120120
* Optionally resolves the given strings to a [[NamedExpression]] using the input from all child

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ class SparkPlanInfo(
3535
val metrics: Seq[SQLMetricInfo]) {
3636

3737
override def hashCode(): Int = {
38+
// hashCode of simpleString should be good enough to distinguish the plans from each other
39+
// within a plan
3840
simpleString.hashCode
3941
}
4042

sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,9 @@ abstract class Exchange extends UnaryNode {
3737
}
3838

3939
/**
40-
* A wrapper for reused exchange to have different output, which is required to resolve the
41-
* attributes in following plans.
40+
* A wrapper for reused exchange to have different output, because two exchanges which produce
41+
* logically identical output will have distinct sets of output attribute ids, so we need to
42+
* preserve the original ids because they're what downstream operators are expecting.
4243
*/
4344
case class ReusedExchange(override val output: Seq[Attribute], child: Exchange) extends LeafNode {
4445

@@ -73,15 +74,15 @@ private[sql] case class ReuseExchange(sqlContext: SQLContext) extends Rule[Spark
7374
val exchanges = mutable.HashMap[StructType, ArrayBuffer[Exchange]]()
7475
plan.transformUp {
7576
case exchange: Exchange =>
77+
// the exchanges that have same results usually also have same schemas (same column names).
7678
val sameSchema = exchanges.getOrElseUpdate(exchange.schema, ArrayBuffer[Exchange]())
7779
val samePlan = sameSchema.find { e =>
7880
exchange.sameResult(e)
7981
}
8082
if (samePlan.isDefined) {
8183
// Keep the output of this exchange, the following plans require that to resolve
8284
// attributes.
83-
val reused = ReusedExchange(exchange.output, samePlan.get)
84-
reused
85+
ReusedExchange(exchange.output, samePlan.get)
8586
} else {
8687
sameSchema += exchange
8788
exchange

sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -104,12 +104,12 @@ case class ShuffleExchange(
104104
/**
105105
* Caches the created ShuffleRowRDD so we can reuse that.
106106
*/
107-
private var shuffleRDD: ShuffledRowRDD = null
107+
private var cachedShuffleRDD: ShuffledRowRDD = null
108108

109109
protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") {
110110
// Returns the same ShuffleRowRDD if this plan is used by multiple plans.
111-
if (shuffleRDD == null) {
112-
shuffleRDD = coordinator match {
111+
if (cachedShuffleRDD == null) {
112+
cachedShuffleRDD = coordinator match {
113113
case Some(exchangeCoordinator) =>
114114
val shuffleRDD = exchangeCoordinator.postShuffleRDD(this)
115115
assert(shuffleRDD.partitions.length == newPartitioning.numPartitions)
@@ -119,7 +119,7 @@ case class ShuffleExchange(
119119
preparePostShuffleRDD(shuffleDependency)
120120
}
121121
}
122-
shuffleRDD
122+
cachedShuffleRDD
123123
}
124124
}
125125

sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,7 @@ private[sql] object SparkPlanGraph {
104104
} else {
105105
subgraph.nodes += node
106106
}
107-
// ShuffleExchange or BroadcastExchange
108-
if (name.endsWith("Exchange")) {
107+
if (name == "ShuffleExchange" || name == "BroadcastExchange") {
109108
exchanges += planInfo -> node
110109
}
111110

sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1341,6 +1341,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
13411341
val df = sqlContext.range(100)
13421342
val agg1 = df.groupBy().count()
13431343
val agg2 = df.groupBy().count()
1344+
// two aggregates with different ExprId within them should have same result
13441345
agg1.queryExecution.executedPlan.sameResult(agg2.queryExecution.executedPlan)
13451346
}
13461347

0 commit comments

Comments
 (0)