Skip to content

Commit 9f5a0e4

Browse files
committed
Add analysis barrier around analyzed plans.
1 parent dff440f commit 9f5a0e4

File tree

17 files changed

+155
-150
lines changed

17 files changed

+155
-150
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 47 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -165,14 +165,15 @@ class Analyzer(
165165
Batch("Subquery", Once,
166166
UpdateOuterReferences),
167167
Batch("Cleanup", fixedPoint,
168-
CleanupAliases)
168+
CleanupAliases,
169+
EliminateBarriers)
169170
)
170171

171172
/**
172173
* Analyze cte definitions and substitute child plan with analyzed cte definitions.
173174
*/
174175
object CTESubstitution extends Rule[LogicalPlan] {
175-
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
176+
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
176177
case With(child, relations) =>
177178
substituteCTE(child, relations.foldLeft(Seq.empty[(String, LogicalPlan)]) {
178179
case (resolved, (name, relation)) =>
@@ -200,7 +201,7 @@ class Analyzer(
200201
* Substitute child plan with WindowSpecDefinitions.
201202
*/
202203
object WindowsSubstitution extends Rule[LogicalPlan] {
203-
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
204+
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
204205
// Lookup WindowSpecDefinitions. This rule works with unresolved children.
205206
case WithWindowDefinition(windowDefinitions, child) =>
206207
child.transform {
@@ -242,7 +243,7 @@ class Analyzer(
242243
private def hasUnresolvedAlias(exprs: Seq[NamedExpression]) =
243244
exprs.exists(_.find(_.isInstanceOf[UnresolvedAlias]).isDefined)
244245

245-
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
246+
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
246247
case Aggregate(groups, aggs, child) if child.resolved && hasUnresolvedAlias(aggs) =>
247248
Aggregate(groups, assignAliases(aggs), child)
248249

@@ -611,7 +612,7 @@ class Analyzer(
611612
case _ => plan
612613
}
613614

614-
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
615+
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
615616
case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved =>
616617
EliminateSubqueryAliases(lookupTableFromCatalog(u)) match {
617618
case v: View =>
@@ -666,7 +667,9 @@ class Analyzer(
666667
* Generate a new logical plan for the right child with different expression IDs
667668
* for all conflicting attributes.
668669
*/
669-
private def dedupRight (left: LogicalPlan, right: LogicalPlan): LogicalPlan = {
670+
private def dedupRight (left: LogicalPlan, oriRight: LogicalPlan): LogicalPlan = {
671+
// Remove analysis barrier if any.
672+
val right = EliminateBarriers(oriRight)
670673
val conflictingAttributes = left.outputSet.intersect(right.outputSet)
671674
logDebug(s"Conflicting attributes ${conflictingAttributes.mkString(",")} " +
672675
s"between $left and $right")
@@ -709,7 +712,7 @@ class Analyzer(
709712
* that this rule cannot handle. When that is the case, there must be another rule
710713
* that resolves these conflicts. Otherwise, the analysis will fail.
711714
*/
712-
right
715+
oriRight
713716
case Some((oldRelation, newRelation)) =>
714717
val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output))
715718
val newRight = right transformUp {
@@ -722,7 +725,7 @@ class Analyzer(
722725
s.withNewPlan(dedupOuterReferencesInSubquery(s.plan, attributeRewrites))
723726
}
724727
}
725-
newRight
728+
AnalysisBarrier(newRight)
726729
}
727730
}
728731

@@ -799,7 +802,7 @@ class Analyzer(
799802
case _ => e.mapChildren(resolve(_, q))
800803
}
801804

802-
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
805+
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
803806
case p: LogicalPlan if !p.childrenResolved => p
804807

805808
// If the projection list contains Stars, expand it.
@@ -993,7 +996,7 @@ class Analyzer(
993996
* have no effect on the results.
994997
*/
995998
object ResolveOrdinalInOrderByAndGroupBy extends Rule[LogicalPlan] {
996-
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
999+
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
9971000
case p if !p.childrenResolved => p
9981001
// Replace the index with the related attribute for ORDER BY,
9991002
// which is a 1-base position of the projection list.
@@ -1049,7 +1052,7 @@ class Analyzer(
10491052
}}
10501053
}
10511054

1052-
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
1055+
override def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
10531056
case agg @ Aggregate(groups, aggs, child)
10541057
if conf.groupByAliases && child.resolved && aggs.forall(_.resolved) &&
10551058
groups.exists(!_.resolved) =>
@@ -1073,11 +1076,13 @@ class Analyzer(
10731076
* The HAVING clause could also used a grouping columns that is not presented in the SELECT.
10741077
*/
10751078
object ResolveMissingReferences extends Rule[LogicalPlan] {
1076-
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
1079+
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
10771080
// Skip sort with aggregate. This will be handled in ResolveAggregateFunctions
1081+
case sa @ Sort(_, _, AnalysisBarrier(child: Aggregate)) => sa
10781082
case sa @ Sort(_, _, child: Aggregate) => sa
10791083

1080-
case s @ Sort(order, _, child) if !s.resolved && child.resolved =>
1084+
case s @ Sort(order, _, oriChild) if !s.resolved && oriChild.resolved =>
1085+
val child = EliminateBarriers(oriChild)
10811086
try {
10821087
val newOrder = order.map(resolveExpressionRecursively(_, child).asInstanceOf[SortOrder])
10831088
val requiredAttrs = AttributeSet(newOrder).filter(_.resolved)
@@ -1098,7 +1103,8 @@ class Analyzer(
10981103
case ae: AnalysisException => s
10991104
}
11001105

1101-
case f @ Filter(cond, child) if !f.resolved && child.resolved =>
1106+
case f @ Filter(cond, oriChild) if !f.resolved && oriChild.resolved =>
1107+
val child = EliminateBarriers(oriChild)
11021108
try {
11031109
val newCond = resolveExpressionRecursively(cond, child)
11041110
val requiredAttrs = newCond.references.filter(_.resolved)
@@ -1125,7 +1131,7 @@ class Analyzer(
11251131
*/
11261132
private def addMissingAttr(plan: LogicalPlan, missingAttrs: AttributeSet): LogicalPlan = {
11271133
if (missingAttrs.isEmpty) {
1128-
return plan
1134+
return AnalysisBarrier(plan)
11291135
}
11301136
plan match {
11311137
case p: Project =>
@@ -1197,7 +1203,7 @@ class Analyzer(
11971203
* Replaces [[UnresolvedFunction]]s with concrete [[Expression]]s.
11981204
*/
11991205
object ResolveFunctions extends Rule[LogicalPlan] {
1200-
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
1206+
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
12011207
case q: LogicalPlan =>
12021208
q transformExpressions {
12031209
case u if !u.childrenResolved => u // Skip until children are resolved.
@@ -1334,7 +1340,7 @@ class Analyzer(
13341340
/**
13351341
* Resolve and rewrite all subqueries in an operator tree..
13361342
*/
1337-
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
1343+
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
13381344
// In case of HAVING (a filter after an aggregate) we use both the aggregate and
13391345
// its child for resolution.
13401346
case f @ Filter(_, a: Aggregate) if f.childrenResolved =>
@@ -1350,7 +1356,7 @@ class Analyzer(
13501356
*/
13511357
object ResolveSubqueryColumnAliases extends Rule[LogicalPlan] {
13521358

1353-
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
1359+
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
13541360
case u @ UnresolvedSubqueryColumnAliases(columnNames, child) if child.resolved =>
13551361
// Resolves output attributes if a query has alias names in its subquery:
13561362
// e.g., SELECT * FROM (SELECT 1 AS a, 1 AS b) t(col1, col2)
@@ -1373,7 +1379,7 @@ class Analyzer(
13731379
* Turns projections that contain aggregate expressions into aggregations.
13741380
*/
13751381
object GlobalAggregates extends Rule[LogicalPlan] {
1376-
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
1382+
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
13771383
case Project(projectList, child) if containsAggregates(projectList) =>
13781384
Aggregate(Nil, projectList, child)
13791385
}
@@ -1399,7 +1405,9 @@ class Analyzer(
13991405
* underlying aggregate operator and then projected away after the original operator.
14001406
*/
14011407
object ResolveAggregateFunctions extends Rule[LogicalPlan] {
1402-
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
1408+
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
1409+
case filter @ Filter(havingCondition, AnalysisBarrier(aggregate: Aggregate)) =>
1410+
apply(Filter(havingCondition, aggregate)).mapChildren(AnalysisBarrier)
14031411
case filter @ Filter(havingCondition,
14041412
aggregate @ Aggregate(grouping, originalAggExprs, child))
14051413
if aggregate.resolved =>
@@ -1459,6 +1467,8 @@ class Analyzer(
14591467
case ae: AnalysisException => filter
14601468
}
14611469

1470+
case sort @ Sort(sortOrder, global, AnalysisBarrier(aggregate: Aggregate)) =>
1471+
apply(Sort(sortOrder, global, aggregate)).mapChildren(AnalysisBarrier)
14621472
case sort @ Sort(sortOrder, global, aggregate: Aggregate) if aggregate.resolved =>
14631473

14641474
// Try resolving the ordering as though it is in the aggregate clause.
@@ -1571,7 +1581,7 @@ class Analyzer(
15711581
}
15721582
}
15731583

1574-
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
1584+
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
15751585
case Project(projectList, _) if projectList.exists(hasNestedGenerator) =>
15761586
val nestedGenerator = projectList.find(hasNestedGenerator).get
15771587
throw new AnalysisException("Generators are not supported when it's nested in " +
@@ -1629,7 +1639,7 @@ class Analyzer(
16291639
* that wrap the [[Generator]].
16301640
*/
16311641
object ResolveGenerate extends Rule[LogicalPlan] {
1632-
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
1642+
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
16331643
case g: Generate if !g.child.resolved || !g.generator.resolved => g
16341644
case g: Generate if !g.resolved =>
16351645
g.copy(generatorOutput = makeGeneratorOutput(g.generator, g.generatorOutput.map(_.name)))
@@ -1946,7 +1956,7 @@ class Analyzer(
19461956
* put them into an inner Project and finally project them away at the outer Project.
19471957
*/
19481958
object PullOutNondeterministic extends Rule[LogicalPlan] {
1949-
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
1959+
override def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
19501960
case p if !p.resolved => p // Skip unresolved nodes.
19511961
case p: Project => p
19521962
case f: Filter => f
@@ -1991,7 +2001,7 @@ class Analyzer(
19912001
* and we should return null if the input is null.
19922002
*/
19932003
object HandleNullInputsForUDF extends Rule[LogicalPlan] {
1994-
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
2004+
override def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
19952005
case p if !p.resolved => p // Skip unresolved nodes.
19962006

19972007
case p => p transformExpressionsUp {
@@ -2056,7 +2066,7 @@ class Analyzer(
20562066
* Then apply a Project on a normal Join to eliminate natural or using join.
20572067
*/
20582068
object ResolveNaturalAndUsingJoin extends Rule[LogicalPlan] {
2059-
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
2069+
override def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
20602070
case j @ Join(left, right, UsingJoin(joinType, usingCols), condition)
20612071
if left.resolved && right.resolved && j.duplicateResolved =>
20622072
commonNaturalJoinProcessing(left, right, joinType, usingCols, None)
@@ -2121,7 +2131,7 @@ class Analyzer(
21212131
* to the given input attributes.
21222132
*/
21232133
object ResolveDeserializer extends Rule[LogicalPlan] {
2124-
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
2134+
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
21252135
case p if !p.childrenResolved => p
21262136
case p if p.resolved => p
21272137

@@ -2207,7 +2217,7 @@ class Analyzer(
22072217
* constructed is an inner class.
22082218
*/
22092219
object ResolveNewInstance extends Rule[LogicalPlan] {
2210-
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
2220+
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
22112221
case p if !p.childrenResolved => p
22122222
case p if p.resolved => p
22132223

@@ -2241,7 +2251,7 @@ class Analyzer(
22412251
"type of the field in the target object")
22422252
}
22432253

2244-
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
2254+
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
22452255
case p if !p.childrenResolved => p
22462256
case p if p.resolved => p
22472257

@@ -2300,7 +2310,7 @@ object CleanupAliases extends Rule[LogicalPlan] {
23002310
case other => trimAliases(other)
23012311
}
23022312

2303-
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
2313+
override def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
23042314
case Project(projectList, child) =>
23052315
val cleanedProjectList =
23062316
projectList.map(trimNonTopLevelAliases(_).asInstanceOf[NamedExpression])
@@ -2329,6 +2339,13 @@ object CleanupAliases extends Rule[LogicalPlan] {
23292339
}
23302340
}
23312341

2342+
/** Remove the barrier nodes of analysis */
2343+
object EliminateBarriers extends Rule[LogicalPlan] {
2344+
override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
2345+
case AnalysisBarrier(child) => child
2346+
}
2347+
}
2348+
23322349
/**
23332350
* Ignore event time watermark in batch query, which is only supported in Structured Streaming.
23342351
* TODO: add this rule into analyzer rule list.
@@ -2379,7 +2396,7 @@ object TimeWindowing extends Rule[LogicalPlan] {
23792396
* @return the logical plan that will generate the time windows using the Expand operator, with
23802397
* the Filter operator for correctness and Project for usability.
23812398
*/
2382-
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
2399+
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
23832400
case p: LogicalPlan if p.children.size == 1 =>
23842401
val child = p.children.head
23852402
val windowExpressions =

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecision.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ object DecimalPrecision extends Rule[LogicalPlan] {
7878
PromotePrecision(Cast(e, dataType))
7979
}
8080

81-
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
81+
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
8282
// fix decimal precision for expressions
8383
case q => q.transformExpressionsUp(
8484
decimalAndDecimal.orElse(integralAndDecimalLiteral).orElse(nondecimalAndDecimal))

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ object ResolveTableValuedFunctions extends Rule[LogicalPlan] {
103103
})
104104
)
105105

106-
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
106+
override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
107107
case u: UnresolvedTableValuedFunction if u.functionArgs.forall(_.resolved) =>
108108
val resolvedFunc = builtinFunctions.get(u.functionName.toLowerCase(Locale.ROOT)) match {
109109
case Some(tvf) =>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/SubstituteUnresolvedOrdinals.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class SubstituteUnresolvedOrdinals(conf: SQLConf) extends Rule[LogicalPlan] {
3333
case _ => false
3434
}
3535

36-
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
36+
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
3737
case s: Sort if conf.orderByOrdinal && s.order.exists(o => isIntLiteral(o.child)) =>
3838
val newOrders = s.order.map {
3939
case order @ SortOrder(ordinal @ Literal(index: Int, IntegerType), _, _, _) =>

0 commit comments

Comments
 (0)