Skip to content

Commit 759dda6

Browse files
committed
review
1 parent 9d46a60 commit 759dda6

File tree

5 files changed

+331
-186
lines changed

5 files changed

+331
-186
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 123 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,127 @@ object AnalysisContext {
124124
}
125125
}
126126

127+
object Analyzer {
128+
129+
/**
130+
* Rewrites a given `plan` recursively based on rewrite mappings from old plans to new ones.
131+
* This method also updates all the related references in the `plan` accordingly.
132+
*
133+
* @param plan to rewrite
134+
* @param rewritePlanMap has mappings from old plans to new ones for the given `plan`.
135+
* @return a rewritten plan and updated references related to a root node of
136+
* the given `plan` for rewriting it.
137+
*/
138+
def rewritePlan(plan: LogicalPlan, rewritePlanMap: Map[LogicalPlan, LogicalPlan])
139+
: (LogicalPlan, Seq[(Attribute, Attribute)]) = {
140+
if (plan.resolved) {
141+
val attrMapping = new mutable.ArrayBuffer[(Attribute, Attribute)]()
142+
val newChildren = plan.children.map { child =>
143+
// If not, we'd rewrite child plan recursively until we find the
144+
// conflict node or reach the leaf node.
145+
val (newChild, childAttrMapping) = rewritePlan(child, rewritePlanMap)
146+
attrMapping ++= childAttrMapping.filter { case (oldAttr, _) =>
147+
// `attrMapping` is not only used to replace the attributes of the current `plan`,
148+
// but also to be propagated to the parent plans of the current `plan`. Therefore,
149+
// the `oldAttr` must be part of either `plan.references` (so that it can be used to
150+
// replace attributes of the current `plan`) or `plan.outputSet` (so that it can be
151+
// used by those parent plans).
152+
(plan.outputSet ++ plan.references).contains(oldAttr)
153+
}
154+
newChild
155+
}
156+
157+
val newPlan = if (rewritePlanMap.contains(plan)) {
158+
rewritePlanMap(plan).withNewChildren(newChildren)
159+
} else {
160+
plan.withNewChildren(newChildren)
161+
}
162+
163+
assert(!attrMapping.groupBy(_._1.exprId)
164+
.exists(_._2.map(_._2.exprId).distinct.length > 1),
165+
"Found duplicate rewrite attributes")
166+
167+
val attributeRewrites = AttributeMap(attrMapping)
168+
// Using attrMapping from the children plans to rewrite their parent node.
169+
// Note that we shouldn't rewrite a node using attrMapping from its sibling nodes.
170+
val p = newPlan.transformExpressions {
171+
case a: Attribute =>
172+
updateAttr(a, attributeRewrites)
173+
case s: SubqueryExpression =>
174+
s.withNewPlan(updateOuterReferencesInSubquery(s.plan, attributeRewrites))
175+
}
176+
attrMapping ++= plan.output.zip(p.output)
177+
.filter { case (a1, a2) => a1.exprId != a2.exprId }
178+
p -> attrMapping
179+
} else {
180+
// Just passes through unresolved nodes
181+
plan.mapChildren {
182+
rewritePlan(_, rewritePlanMap)._1
183+
} -> Nil
184+
}
185+
}
186+
187+
private def updateAttr(attr: Attribute, attrMap: AttributeMap[Attribute]): Attribute = {
188+
val exprId = attrMap.getOrElse(attr, attr).exprId
189+
attr.withExprId(exprId)
190+
}
191+
192+
/**
193+
* The outer plan may have old references and the function below updates the
194+
* outer references to refer to the new attributes.
195+
*
196+
* For example (SQL):
197+
* {{{
198+
* SELECT * FROM t1
199+
* INTERSECT
200+
* SELECT * FROM t1
201+
* WHERE EXISTS (SELECT 1
202+
* FROM t2
203+
* WHERE t1.c1 = t2.c1)
204+
* }}}
205+
* Plan before resolveReference rule.
206+
* 'Intersect
207+
* :- Project [c1#245, c2#246]
208+
* : +- SubqueryAlias t1
209+
* : +- Relation[c1#245,c2#246] parquet
210+
* +- 'Project [*]
211+
* +- Filter exists#257 [c1#245]
212+
* : +- Project [1 AS 1#258]
213+
* : +- Filter (outer(c1#245) = c1#251)
214+
* : +- SubqueryAlias t2
215+
* : +- Relation[c1#251,c2#252] parquet
216+
* +- SubqueryAlias t1
217+
* +- Relation[c1#245,c2#246] parquet
218+
* Plan after the resolveReference rule.
219+
* Intersect
220+
* :- Project [c1#245, c2#246]
221+
* : +- SubqueryAlias t1
222+
* : +- Relation[c1#245,c2#246] parquet
223+
* +- Project [c1#259, c2#260]
224+
* +- Filter exists#257 [c1#259]
225+
* : +- Project [1 AS 1#258]
226+
* : +- Filter (outer(c1#259) = c1#251) => Updated
227+
* : +- SubqueryAlias t2
228+
* : +- Relation[c1#251,c2#252] parquet
229+
* +- SubqueryAlias t1
230+
* +- Relation[c1#259,c2#260] parquet => Outer plan's attributes are rewritten.
231+
*/
232+
private def updateOuterReferencesInSubquery(
233+
plan: LogicalPlan,
234+
attrMap: AttributeMap[Attribute]): LogicalPlan = {
235+
AnalysisHelper.allowInvokingTransformsInAnalyzer {
236+
plan transformDown { case currentFragment =>
237+
currentFragment transformExpressions {
238+
case OuterReference(a: Attribute) =>
239+
OuterReference(updateAttr(a, attrMap))
240+
case s: SubqueryExpression =>
241+
s.withNewPlan(updateOuterReferencesInSubquery(s.plan, attrMap))
242+
}
243+
}
244+
}
245+
}
246+
}
247+
127248
/**
128249
* Provides a logical query plan analyzer, which translates [[UnresolvedAttribute]]s and
129250
* [[UnresolvedRelation]]s into fully typed objects using information in a [[SessionCatalog]].
@@ -137,7 +258,7 @@ class Analyzer(
137258
private val v1SessionCatalog: SessionCatalog = catalogManager.v1SessionCatalog
138259

139260
override protected def isPlanIntegral(plan: LogicalPlan): Boolean = {
140-
!Utils.isTesting || LogicalPlanIntegrity.hasUniqueExprIdsForAttributes(plan)
261+
!Utils.isTesting || LogicalPlanIntegrity.checkIfExprIdsAreGloballyUnique(plan)
141262
}
142263

143264
override def isView(nameParts: Seq[String]): Boolean = v1SessionCatalog.isView(nameParts)
@@ -1256,109 +1377,7 @@ class Analyzer(
12561377
if (conflictPlans.isEmpty) {
12571378
right
12581379
} else {
1259-
rewritePlan(right, conflictPlans.toMap)._1
1260-
}
1261-
}
1262-
1263-
private def rewritePlan(plan: LogicalPlan, conflictPlanMap: Map[LogicalPlan, LogicalPlan])
1264-
: (LogicalPlan, Seq[(Attribute, Attribute)]) = {
1265-
if (conflictPlanMap.contains(plan)) {
1266-
// If the plan is the one that conflict the with left one, we'd
1267-
// just replace it with the new plan and collect the rewrite
1268-
// attributes for the parent node.
1269-
val newRelation = conflictPlanMap(plan)
1270-
newRelation -> plan.output.zip(newRelation.output)
1271-
} else {
1272-
val attrMapping = new mutable.ArrayBuffer[(Attribute, Attribute)]()
1273-
val newPlan = plan.mapChildren { child =>
1274-
// If not, we'd rewrite child plan recursively until we find the
1275-
// conflict node or reach the leaf node.
1276-
val (newChild, childAttrMapping) = rewritePlan(child, conflictPlanMap)
1277-
attrMapping ++= childAttrMapping.filter { case (oldAttr, _) =>
1278-
// `attrMapping` is not only used to replace the attributes of the current `plan`,
1279-
// but also to be propagated to the parent plans of the current `plan`. Therefore,
1280-
// the `oldAttr` must be part of either `plan.references` (so that it can be used to
1281-
// replace attributes of the current `plan`) or `plan.outputSet` (so that it can be
1282-
// used by those parent plans).
1283-
(plan.outputSet ++ plan.references).contains(oldAttr)
1284-
}
1285-
newChild
1286-
}
1287-
1288-
if (attrMapping.isEmpty) {
1289-
newPlan -> attrMapping.toSeq
1290-
} else {
1291-
assert(!attrMapping.groupBy(_._1.exprId)
1292-
.exists(_._2.map(_._2.exprId).distinct.length > 1),
1293-
"Found duplicate rewrite attributes")
1294-
val attributeRewrites = AttributeMap(attrMapping.toSeq)
1295-
// Using attrMapping from the children plans to rewrite their parent node.
1296-
// Note that we shouldn't rewrite a node using attrMapping from its sibling nodes.
1297-
newPlan.transformExpressions {
1298-
case a: Attribute =>
1299-
dedupAttr(a, attributeRewrites)
1300-
case s: SubqueryExpression =>
1301-
s.withNewPlan(dedupOuterReferencesInSubquery(s.plan, attributeRewrites))
1302-
} -> attrMapping.toSeq
1303-
}
1304-
}
1305-
}
1306-
1307-
private def dedupAttr(attr: Attribute, attrMap: AttributeMap[Attribute]): Attribute = {
1308-
val exprId = attrMap.getOrElse(attr, attr).exprId
1309-
attr.withExprId(exprId)
1310-
}
1311-
1312-
/**
1313-
* The outer plan may have been de-duplicated and the function below updates the
1314-
* outer references to refer to the de-duplicated attributes.
1315-
*
1316-
* For example (SQL):
1317-
* {{{
1318-
* SELECT * FROM t1
1319-
* INTERSECT
1320-
* SELECT * FROM t1
1321-
* WHERE EXISTS (SELECT 1
1322-
* FROM t2
1323-
* WHERE t1.c1 = t2.c1)
1324-
* }}}
1325-
* Plan before resolveReference rule.
1326-
* 'Intersect
1327-
* :- Project [c1#245, c2#246]
1328-
* : +- SubqueryAlias t1
1329-
* : +- Relation[c1#245,c2#246] parquet
1330-
* +- 'Project [*]
1331-
* +- Filter exists#257 [c1#245]
1332-
* : +- Project [1 AS 1#258]
1333-
* : +- Filter (outer(c1#245) = c1#251)
1334-
* : +- SubqueryAlias t2
1335-
* : +- Relation[c1#251,c2#252] parquet
1336-
* +- SubqueryAlias t1
1337-
* +- Relation[c1#245,c2#246] parquet
1338-
* Plan after the resolveReference rule.
1339-
* Intersect
1340-
* :- Project [c1#245, c2#246]
1341-
* : +- SubqueryAlias t1
1342-
* : +- Relation[c1#245,c2#246] parquet
1343-
* +- Project [c1#259, c2#260]
1344-
* +- Filter exists#257 [c1#259]
1345-
* : +- Project [1 AS 1#258]
1346-
* : +- Filter (outer(c1#259) = c1#251) => Updated
1347-
* : +- SubqueryAlias t2
1348-
* : +- Relation[c1#251,c2#252] parquet
1349-
* +- SubqueryAlias t1
1350-
* +- Relation[c1#259,c2#260] parquet => Outer plan's attributes are de-duplicated.
1351-
*/
1352-
private def dedupOuterReferencesInSubquery(
1353-
plan: LogicalPlan,
1354-
attrMap: AttributeMap[Attribute]): LogicalPlan = {
1355-
plan transformDown { case currentFragment =>
1356-
currentFragment transformExpressions {
1357-
case OuterReference(a: Attribute) =>
1358-
OuterReference(dedupAttr(a, attrMap))
1359-
case s: SubqueryExpression =>
1360-
s.withNewPlan(dedupOuterReferencesInSubquery(s.plan, attrMap))
1361-
}
1380+
Analyzer.rewritePlan(right, conflictPlans.toMap)._1
13621381
}
13631382
}
13641383

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 32 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ abstract class Optimizer(catalogManager: CatalogManager)
4545
// - only host special expressions in supported operators
4646
override protected def isPlanIntegral(plan: LogicalPlan): Boolean = {
4747
!Utils.isTesting || (plan.resolved &&
48-
plan.find(PlanHelper.specialExpressionsInUnsupportedOperator(_).nonEmpty).isEmpty) &&
49-
LogicalPlanIntegrity.hasUniqueExprIdsForAttributes(plan)
48+
plan.find(PlanHelper.specialExpressionsInUnsupportedOperator(_).nonEmpty).isEmpty &&
49+
LogicalPlanIntegrity.checkIfExprIdsAreGloballyUnique(plan))
5050
}
5151

5252
override protected val excludedOnceBatches: Set[String] =
@@ -1580,23 +1580,36 @@ object ReplaceDistinctWithAggregate extends Rule[LogicalPlan] {
15801580
* Replaces logical [[Deduplicate]] operator with an [[Aggregate]] operator.
15811581
*/
15821582
object ReplaceDeduplicateWithAggregate extends Rule[LogicalPlan] {
1583-
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
1584-
case Deduplicate(keys, child) if !child.isStreaming =>
1585-
val keyExprIds = keys.map(_.exprId)
1586-
val aggCols = child.output.map { attr =>
1587-
if (keyExprIds.contains(attr.exprId)) {
1588-
attr
1589-
} else {
1590-
Alias(new First(attr).toAggregateExpression(), attr.name)(attr.exprId)
1591-
}
1592-
}
1593-
// SPARK-22951: Physical aggregate operators distinguishes global aggregation and grouping
1594-
// aggregations by checking the number of grouping keys. The key difference here is that a
1595-
// global aggregation always returns at least one row even if there are no input rows. Here
1596-
// we append a literal when the grouping key list is empty so that the result aggregate
1597-
// operator is properly treated as a grouping aggregation.
1598-
val nonemptyKeys = if (keys.isEmpty) Literal(1) :: Nil else keys
1599-
Aggregate(nonemptyKeys, aggCols, child)
1583+
def apply(plan: LogicalPlan): LogicalPlan = {
1584+
val rewritePlanMap = mutable.ArrayBuffer[(LogicalPlan, LogicalPlan)]()
1585+
val newPlan = plan transform {
1586+
case Deduplicate(keys, child) if !child.isStreaming =>
1587+
val keyExprIds = keys.map(_.exprId)
1588+
val aggCols = child.output.map { attr =>
1589+
if (keyExprIds.contains(attr.exprId)) {
1590+
attr -> attr
1591+
} else {
1592+
val alias = Alias(new First(attr).toAggregateExpression(), attr.name)(attr.exprId)
1593+
alias -> alias.newInstance()
1594+
}
1595+
}.unzip
1596+
// SPARK-22951: Physical aggregate operators distinguishes global aggregation and grouping
1597+
// aggregations by checking the number of grouping keys. The key difference here is that a
1598+
// global aggregation always returns at least one row even if there are no input rows. Here
1599+
// we append a literal when the grouping key list is empty so that the result aggregate
1600+
// operator is properly treated as a grouping aggregation.
1601+
val nonemptyKeys = if (keys.isEmpty) Literal(1) :: Nil else keys
1602+
val newAgg = Aggregate(nonemptyKeys, aggCols._1, child)
1603+
rewritePlanMap += newAgg -> Aggregate(nonemptyKeys, aggCols._2, child)
1604+
newAgg
1605+
}
1606+
1607+
if (rewritePlanMap.nonEmpty) {
1608+
assert(!plan.fastEquals(newPlan))
1609+
Analyzer.rewritePlan(newPlan, rewritePlanMap.toMap)._1
1610+
} else {
1611+
plan
1612+
}
16001613
}
16011614
}
16021615

0 commit comments

Comments
 (0)