Skip to content

Commit 00d176d

Browse files
viiryagatorsmile
authored andcommitted
[SPARK-20392][SQL] Set barrier to prevent re-entering a tree
## What changes were proposed in this pull request? The SQL `Analyzer` goes through a whole query plan even most part of it is analyzed. This increases the time spent on query analysis for long pipelines in ML, especially. This patch adds a logical node called `AnalysisBarrier` that wraps an analyzed logical plan to prevent it from analysis again. The barrier is applied to the analyzed logical plan in `Dataset`. It won't change the output of wrapped logical plan and just acts as a wrapper to hide it from analyzer. New operations on the dataset will be put on the barrier, so only the new nodes created will be analyzed. This analysis barrier will be removed at the end of analysis stage. ## How was this patch tested? Added tests. Author: Liang-Chi Hsieh <[email protected]> Closes #19873 from viirya/SPARK-20392-reopen.
1 parent 82183f7 commit 00d176d

File tree

17 files changed

+185
-165
lines changed

17 files changed

+185
-165
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 47 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -165,14 +165,15 @@ class Analyzer(
165165
Batch("Subquery", Once,
166166
UpdateOuterReferences),
167167
Batch("Cleanup", fixedPoint,
168-
CleanupAliases)
168+
CleanupAliases,
169+
EliminateBarriers)
169170
)
170171

171172
/**
172173
* Analyze cte definitions and substitute child plan with analyzed cte definitions.
173174
*/
174175
object CTESubstitution extends Rule[LogicalPlan] {
175-
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
176+
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
176177
case With(child, relations) =>
177178
substituteCTE(child, relations.foldLeft(Seq.empty[(String, LogicalPlan)]) {
178179
case (resolved, (name, relation)) =>
@@ -200,7 +201,7 @@ class Analyzer(
200201
* Substitute child plan with WindowSpecDefinitions.
201202
*/
202203
object WindowsSubstitution extends Rule[LogicalPlan] {
203-
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
204+
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
204205
// Lookup WindowSpecDefinitions. This rule works with unresolved children.
205206
case WithWindowDefinition(windowDefinitions, child) =>
206207
child.transform {
@@ -242,7 +243,7 @@ class Analyzer(
242243
private def hasUnresolvedAlias(exprs: Seq[NamedExpression]) =
243244
exprs.exists(_.find(_.isInstanceOf[UnresolvedAlias]).isDefined)
244245

245-
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
246+
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
246247
case Aggregate(groups, aggs, child) if child.resolved && hasUnresolvedAlias(aggs) =>
247248
Aggregate(groups, assignAliases(aggs), child)
248249

@@ -611,7 +612,7 @@ class Analyzer(
611612
case _ => plan
612613
}
613614

614-
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
615+
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
615616
case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved =>
616617
EliminateSubqueryAliases(lookupTableFromCatalog(u)) match {
617618
case v: View =>
@@ -666,7 +667,9 @@ class Analyzer(
666667
* Generate a new logical plan for the right child with different expression IDs
667668
* for all conflicting attributes.
668669
*/
669-
private def dedupRight (left: LogicalPlan, right: LogicalPlan): LogicalPlan = {
670+
private def dedupRight (left: LogicalPlan, originalRight: LogicalPlan): LogicalPlan = {
671+
// Remove analysis barrier if any.
672+
val right = EliminateBarriers(originalRight)
670673
val conflictingAttributes = left.outputSet.intersect(right.outputSet)
671674
logDebug(s"Conflicting attributes ${conflictingAttributes.mkString(",")} " +
672675
s"between $left and $right")
@@ -709,7 +712,7 @@ class Analyzer(
709712
* that this rule cannot handle. When that is the case, there must be another rule
710713
* that resolves these conflicts. Otherwise, the analysis will fail.
711714
*/
712-
right
715+
originalRight
713716
case Some((oldRelation, newRelation)) =>
714717
val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output))
715718
val newRight = right transformUp {
@@ -722,7 +725,7 @@ class Analyzer(
722725
s.withNewPlan(dedupOuterReferencesInSubquery(s.plan, attributeRewrites))
723726
}
724727
}
725-
newRight
728+
AnalysisBarrier(newRight)
726729
}
727730
}
728731

@@ -799,7 +802,7 @@ class Analyzer(
799802
case _ => e.mapChildren(resolve(_, q))
800803
}
801804

802-
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
805+
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
803806
case p: LogicalPlan if !p.childrenResolved => p
804807

805808
// If the projection list contains Stars, expand it.
@@ -993,7 +996,7 @@ class Analyzer(
993996
* have no effect on the results.
994997
*/
995998
object ResolveOrdinalInOrderByAndGroupBy extends Rule[LogicalPlan] {
996-
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
999+
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
9971000
case p if !p.childrenResolved => p
9981001
// Replace the index with the related attribute for ORDER BY,
9991002
// which is a 1-base position of the projection list.
@@ -1049,7 +1052,7 @@ class Analyzer(
10491052
}}
10501053
}
10511054

1052-
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
1055+
override def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
10531056
case agg @ Aggregate(groups, aggs, child)
10541057
if conf.groupByAliases && child.resolved && aggs.forall(_.resolved) &&
10551058
groups.exists(!_.resolved) =>
@@ -1073,11 +1076,13 @@ class Analyzer(
10731076
* The HAVING clause could also used a grouping columns that is not presented in the SELECT.
10741077
*/
10751078
object ResolveMissingReferences extends Rule[LogicalPlan] {
1076-
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
1079+
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
10771080
// Skip sort with aggregate. This will be handled in ResolveAggregateFunctions
1081+
case sa @ Sort(_, _, AnalysisBarrier(child: Aggregate)) => sa
10781082
case sa @ Sort(_, _, child: Aggregate) => sa
10791083

1080-
case s @ Sort(order, _, child) if !s.resolved && child.resolved =>
1084+
case s @ Sort(order, _, originalChild) if !s.resolved && originalChild.resolved =>
1085+
val child = EliminateBarriers(originalChild)
10811086
try {
10821087
val newOrder = order.map(resolveExpressionRecursively(_, child).asInstanceOf[SortOrder])
10831088
val requiredAttrs = AttributeSet(newOrder).filter(_.resolved)
@@ -1098,7 +1103,8 @@ class Analyzer(
10981103
case ae: AnalysisException => s
10991104
}
11001105

1101-
case f @ Filter(cond, child) if !f.resolved && child.resolved =>
1106+
case f @ Filter(cond, originalChild) if !f.resolved && originalChild.resolved =>
1107+
val child = EliminateBarriers(originalChild)
11021108
try {
11031109
val newCond = resolveExpressionRecursively(cond, child)
11041110
val requiredAttrs = newCond.references.filter(_.resolved)
@@ -1125,7 +1131,7 @@ class Analyzer(
11251131
*/
11261132
private def addMissingAttr(plan: LogicalPlan, missingAttrs: AttributeSet): LogicalPlan = {
11271133
if (missingAttrs.isEmpty) {
1128-
return plan
1134+
return AnalysisBarrier(plan)
11291135
}
11301136
plan match {
11311137
case p: Project =>
@@ -1197,7 +1203,7 @@ class Analyzer(
11971203
* Replaces [[UnresolvedFunction]]s with concrete [[Expression]]s.
11981204
*/
11991205
object ResolveFunctions extends Rule[LogicalPlan] {
1200-
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
1206+
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
12011207
case q: LogicalPlan =>
12021208
q transformExpressions {
12031209
case u if !u.childrenResolved => u // Skip until children are resolved.
@@ -1334,7 +1340,7 @@ class Analyzer(
13341340
/**
13351341
* Resolve and rewrite all subqueries in an operator tree..
13361342
*/
1337-
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
1343+
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
13381344
// In case of HAVING (a filter after an aggregate) we use both the aggregate and
13391345
// its child for resolution.
13401346
case f @ Filter(_, a: Aggregate) if f.childrenResolved =>
@@ -1350,7 +1356,7 @@ class Analyzer(
13501356
*/
13511357
object ResolveSubqueryColumnAliases extends Rule[LogicalPlan] {
13521358

1353-
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
1359+
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
13541360
case u @ UnresolvedSubqueryColumnAliases(columnNames, child) if child.resolved =>
13551361
// Resolves output attributes if a query has alias names in its subquery:
13561362
// e.g., SELECT * FROM (SELECT 1 AS a, 1 AS b) t(col1, col2)
@@ -1373,7 +1379,7 @@ class Analyzer(
13731379
* Turns projections that contain aggregate expressions into aggregations.
13741380
*/
13751381
object GlobalAggregates extends Rule[LogicalPlan] {
1376-
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
1382+
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
13771383
case Project(projectList, child) if containsAggregates(projectList) =>
13781384
Aggregate(Nil, projectList, child)
13791385
}
@@ -1399,7 +1405,9 @@ class Analyzer(
13991405
* underlying aggregate operator and then projected away after the original operator.
14001406
*/
14011407
object ResolveAggregateFunctions extends Rule[LogicalPlan] {
1402-
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
1408+
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
1409+
case filter @ Filter(havingCondition, AnalysisBarrier(aggregate: Aggregate)) =>
1410+
apply(Filter(havingCondition, aggregate)).mapChildren(AnalysisBarrier)
14031411
case filter @ Filter(havingCondition,
14041412
aggregate @ Aggregate(grouping, originalAggExprs, child))
14051413
if aggregate.resolved =>
@@ -1459,6 +1467,8 @@ class Analyzer(
14591467
case ae: AnalysisException => filter
14601468
}
14611469

1470+
case sort @ Sort(sortOrder, global, AnalysisBarrier(aggregate: Aggregate)) =>
1471+
apply(Sort(sortOrder, global, aggregate)).mapChildren(AnalysisBarrier)
14621472
case sort @ Sort(sortOrder, global, aggregate: Aggregate) if aggregate.resolved =>
14631473

14641474
// Try resolving the ordering as though it is in the aggregate clause.
@@ -1571,7 +1581,7 @@ class Analyzer(
15711581
}
15721582
}
15731583

1574-
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
1584+
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
15751585
case Project(projectList, _) if projectList.exists(hasNestedGenerator) =>
15761586
val nestedGenerator = projectList.find(hasNestedGenerator).get
15771587
throw new AnalysisException("Generators are not supported when it's nested in " +
@@ -1629,7 +1639,7 @@ class Analyzer(
16291639
* that wrap the [[Generator]].
16301640
*/
16311641
object ResolveGenerate extends Rule[LogicalPlan] {
1632-
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
1642+
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
16331643
case g: Generate if !g.child.resolved || !g.generator.resolved => g
16341644
case g: Generate if !g.resolved =>
16351645
g.copy(generatorOutput = makeGeneratorOutput(g.generator, g.generatorOutput.map(_.name)))
@@ -1946,7 +1956,7 @@ class Analyzer(
19461956
* put them into an inner Project and finally project them away at the outer Project.
19471957
*/
19481958
object PullOutNondeterministic extends Rule[LogicalPlan] {
1949-
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
1959+
override def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
19501960
case p if !p.resolved => p // Skip unresolved nodes.
19511961
case p: Project => p
19521962
case f: Filter => f
@@ -1991,7 +2001,7 @@ class Analyzer(
19912001
* and we should return null if the input is null.
19922002
*/
19932003
object HandleNullInputsForUDF extends Rule[LogicalPlan] {
1994-
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
2004+
override def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
19952005
case p if !p.resolved => p // Skip unresolved nodes.
19962006

19972007
case p => p transformExpressionsUp {
@@ -2056,7 +2066,7 @@ class Analyzer(
20562066
* Then apply a Project on a normal Join to eliminate natural or using join.
20572067
*/
20582068
object ResolveNaturalAndUsingJoin extends Rule[LogicalPlan] {
2059-
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
2069+
override def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
20602070
case j @ Join(left, right, UsingJoin(joinType, usingCols), condition)
20612071
if left.resolved && right.resolved && j.duplicateResolved =>
20622072
commonNaturalJoinProcessing(left, right, joinType, usingCols, None)
@@ -2121,7 +2131,7 @@ class Analyzer(
21212131
* to the given input attributes.
21222132
*/
21232133
object ResolveDeserializer extends Rule[LogicalPlan] {
2124-
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
2134+
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
21252135
case p if !p.childrenResolved => p
21262136
case p if p.resolved => p
21272137

@@ -2207,7 +2217,7 @@ class Analyzer(
22072217
* constructed is an inner class.
22082218
*/
22092219
object ResolveNewInstance extends Rule[LogicalPlan] {
2210-
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
2220+
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
22112221
case p if !p.childrenResolved => p
22122222
case p if p.resolved => p
22132223

@@ -2241,7 +2251,7 @@ class Analyzer(
22412251
"type of the field in the target object")
22422252
}
22432253

2244-
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
2254+
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
22452255
case p if !p.childrenResolved => p
22462256
case p if p.resolved => p
22472257

@@ -2300,7 +2310,7 @@ object CleanupAliases extends Rule[LogicalPlan] {
23002310
case other => trimAliases(other)
23012311
}
23022312

2303-
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
2313+
override def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
23042314
case Project(projectList, child) =>
23052315
val cleanedProjectList =
23062316
projectList.map(trimNonTopLevelAliases(_).asInstanceOf[NamedExpression])
@@ -2329,6 +2339,13 @@ object CleanupAliases extends Rule[LogicalPlan] {
23292339
}
23302340
}
23312341

2342+
/** Remove the barrier nodes of analysis */
2343+
object EliminateBarriers extends Rule[LogicalPlan] {
2344+
override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
2345+
case AnalysisBarrier(child) => child
2346+
}
2347+
}
2348+
23322349
/**
23332350
* Ignore event time watermark in batch query, which is only supported in Structured Streaming.
23342351
* TODO: add this rule into analyzer rule list.
@@ -2379,7 +2396,7 @@ object TimeWindowing extends Rule[LogicalPlan] {
23792396
* @return the logical plan that will generate the time windows using the Expand operator, with
23802397
* the Filter operator for correctness and Project for usability.
23812398
*/
2382-
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
2399+
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
23832400
case p: LogicalPlan if p.children.size == 1 =>
23842401
val child = p.children.head
23852402
val windowExpressions =

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,6 @@ trait CheckAnalysis extends PredicateHelper {
7878
// We transform up and order the rules so as to catch the first possible failure instead
7979
// of the result of cascading resolution failures.
8080
plan.foreachUp {
81-
case p if p.analyzed => // Skip already analyzed sub-plans
82-
8381
case u: UnresolvedRelation =>
8482
u.failAnalysis(s"Table or view not found: ${u.tableIdentifier}")
8583

@@ -353,8 +351,6 @@ trait CheckAnalysis extends PredicateHelper {
353351
case o if !o.resolved => failAnalysis(s"unresolved operator ${o.simpleString}")
354352
case _ =>
355353
}
356-
357-
plan.foreach(_.setAnalyzed())
358354
}
359355

360356
/**

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecision.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ object DecimalPrecision extends TypeCoercionRule {
7878
PromotePrecision(Cast(e, dataType))
7979
}
8080

81-
override protected def coerceTypes(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
81+
override protected def coerceTypes(plan: LogicalPlan): LogicalPlan = plan transformUp {
8282
// fix decimal precision for expressions
8383
case q => q.transformExpressionsUp(
8484
decimalAndDecimal.orElse(integralAndDecimalLiteral).orElse(nondecimalAndDecimal))

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ object ResolveTableValuedFunctions extends Rule[LogicalPlan] {
103103
})
104104
)
105105

106-
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
106+
override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
107107
case u: UnresolvedTableValuedFunction if u.functionArgs.forall(_.resolved) =>
108108
val resolvedFunc = builtinFunctions.get(u.functionName.toLowerCase(Locale.ROOT)) match {
109109
case Some(tvf) =>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/SubstituteUnresolvedOrdinals.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class SubstituteUnresolvedOrdinals(conf: SQLConf) extends Rule[LogicalPlan] {
3333
case _ => false
3434
}
3535

36-
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
36+
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
3737
case s: Sort if conf.orderByOrdinal && s.order.exists(o => isIntLiteral(o.child)) =>
3838
val newOrders = s.order.map {
3939
case order @ SortOrder(ordinal @ Literal(index: Int, IntegerType), _, _, _) =>

0 commit comments

Comments
 (0)