Skip to content

Commit f6899a0

Browse files
committed
fix review findings
1 parent db4e04a commit f6899a0

File tree

2 files changed

+17
-21
lines changed

2 files changed

+17
-21
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ object MergeScalarSubqueries extends Rule[LogicalPlan] {
136136
} else {
137137
removeReferences(mergedPlan.plan, subqueryPlansByLevel)
138138
}
139-
if (mergedPlan.merged) {
139+
if (mergedPlan.merged && mergedPlan.plan.output.size > 1) {
140140
CTERelationDef(
141141
Project(
142142
Seq(Alias(
@@ -177,19 +177,19 @@ object MergeScalarSubqueries extends Rule[LogicalPlan] {
177177
case s: ScalarSubquery if !s.isCorrelated && s.deterministic =>
178178
val (planWithReferences, level) = insertReferences(s.plan, planMergers)
179179

180-
while (level >= planMergers.size) planMergers += PlanMerger()
180+
while (level >= planMergers.size) planMergers += new PlanMerger()
181181
// The subquery could contain a hint that is not propagated once we merge it, but as a
182182
// non-correlated scalar subquery won't be turned into a Join the loss of hints is fine.
183-
val planMergeResult = planMergers(level).merge(planWithReferences)
183+
val mergeResult = planMergers(level).merge(planWithReferences)
184184

185185
maxLevel = maxLevel.max(level + 1)
186186

187-
val mergedOutput = planMergeResult.outputMap(planWithReferences.output.head)
187+
val mergedOutput = mergeResult.outputMap(planWithReferences.output.head)
188188
val headerIndex =
189-
planMergeResult.mergedPlan.output.indexWhere(_.exprId == mergedOutput.exprId)
189+
mergeResult.mergedPlan.plan.output.indexWhere(_.exprId == mergedOutput.exprId)
190190
ScalarSubqueryReference(
191191
level,
192-
planMergeResult.mergedPlanIndex,
192+
mergeResult.mergedPlanIndex,
193193
headerIndex,
194194
s.dataType,
195195
s.exprId)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PlanMerger.scala

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -31,16 +31,13 @@ import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, Join, Log
3131
* - A newly merged plan combining the input with a cached plan
3232
* - The original input plan (if no merge was possible)
3333
* @param mergedPlanIndex The index of this plan in the PlanMerger's cache.
34-
* @param merged Whether the plan was merged with an existing cached plan (true) or
35-
* is a new entry (false).
3634
* @param outputMap Maps attributes from the input plan to corresponding attributes in
3735
* `mergedPlan`. Used to rewrite expressions referencing the original plan
3836
* to reference the merged plan instead.
3937
*/
4038
case class MergeResult(
41-
mergedPlan: LogicalPlan,
39+
mergedPlan: MergedPlan,
4240
mergedPlanIndex: Int,
43-
merged: Boolean,
4441
outputMap: AttributeMap[Attribute])
4542

4643
/**
@@ -75,7 +72,7 @@ case class MergedPlan(plan: LogicalPlan, merged: Boolean)
7572
* val merger = PlanMerger()
7673
* val result1 = merger.merge(plan1) // Adds plan1 to cache
7774
* val result2 = merger.merge(plan2) // Merges with plan1 if compatible
78-
* // result2.merged == true if plans were merged
75+
* // result2.mergedPlan.merged == true if plans were merged
7976
* // result2.outputMap maps plan2's attributes to the merged plan's attributes
8077
* }}}
8178
*/
@@ -94,26 +91,29 @@ class PlanMerger {
9491
* @return A [[MergeResult]] containing:
9592
* - The merged/cached plan to use
9693
* - Its index in the cache
97-
* - Whether it was merged with an existing plan
9894
* - An attribute mapping for rewriting expressions
9995
*/
10096
def merge(plan: LogicalPlan): MergeResult = {
10197
cache.zipWithIndex.collectFirst(Function.unlift {
10298
case (mp, i) =>
10399
checkIdenticalPlans(plan, mp.plan).map { outputMap =>
104-
MergeResult(mp.plan, i, true, outputMap)
100+
val newMergePlan = MergedPlan(mp.plan, true)
101+
cache(i) = newMergePlan
102+
MergeResult(newMergePlan, i, outputMap)
105103
}.orElse {
106104
tryMergePlans(plan, mp.plan).map {
107105
case (mergedPlan, outputMap) =>
108-
cache(i) = MergedPlan(mergedPlan, true)
109-
MergeResult(mergedPlan, i, true, outputMap)
106+
val newMergePlan = MergedPlan(mergedPlan, true)
107+
cache(i) = newMergePlan
108+
MergeResult(newMergePlan, i, outputMap)
110109
}
111110
}
112111
case _ => None
113112
}).getOrElse {
114-
cache += MergedPlan(plan, false)
113+
val newMergePlan = MergedPlan(plan, false)
114+
cache += newMergePlan
115115
val outputMap = AttributeMap(plan.output.map(a => a -> a))
116-
MergeResult(plan, cache.length - 1, false, outputMap)
116+
MergeResult(newMergePlan, cache.length - 1, outputMap)
117117
}
118118
}
119119

@@ -297,7 +297,3 @@ class PlanMerger {
297297
}
298298
}
299299
}
300-
301-
object PlanMerger {
302-
def apply(): PlanMerger = new PlanMerger
303-
}

0 commit comments

Comments
 (0)