Skip to content

Commit e0c090f

Browse files
committed
[SPARK-22932][SQL] Refactor AnalysisContext
## What changes were proposed in this pull request? Add a `reset` function to ensure the state in `AnalysisContext ` is per-query. ## How was this patch tested? The existing test cases Author: gatorsmile <[email protected]> Closes #20127 from gatorsmile/refactorAnalysisContext.
1 parent e734a4b commit e0c090f

File tree

1 file changed

+20
-5
lines changed
  • sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis

1 file changed

+20
-5
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ object SimpleAnalyzer extends Analyzer(
5252
/**
5353
* Provides a way to keep state during the analysis, this enables us to decouple the concerns
5454
* of analysis environment from the catalog.
55+
* The state that is kept here is per-query.
5556
*
5657
* Note this is thread local.
5758
*
@@ -70,6 +71,8 @@ object AnalysisContext {
7071
}
7172

7273
def get: AnalysisContext = value.get()
74+
def reset(): Unit = value.remove()
75+
7376
private def set(context: AnalysisContext): Unit = value.set(context)
7477

7578
def withAnalysisContext[A](database: Option[String])(f: => A): A = {
@@ -95,6 +98,17 @@ class Analyzer(
9598
this(catalog, conf, conf.optimizerMaxIterations)
9699
}
97100

101+
override def execute(plan: LogicalPlan): LogicalPlan = {
102+
AnalysisContext.reset()
103+
try {
104+
executeSameContext(plan)
105+
} finally {
106+
AnalysisContext.reset()
107+
}
108+
}
109+
110+
private def executeSameContext(plan: LogicalPlan): LogicalPlan = super.execute(plan)
111+
98112
def resolver: Resolver = conf.resolver
99113

100114
protected val fixedPoint = FixedPoint(maxIterations)
@@ -176,7 +190,7 @@ class Analyzer(
176190
case With(child, relations) =>
177191
substituteCTE(child, relations.foldLeft(Seq.empty[(String, LogicalPlan)]) {
178192
case (resolved, (name, relation)) =>
179-
resolved :+ name -> execute(substituteCTE(relation, resolved))
193+
resolved :+ name -> executeSameContext(substituteCTE(relation, resolved))
180194
})
181195
case other => other
182196
}
@@ -600,7 +614,7 @@ class Analyzer(
600614
"avoid errors. Increase the value of spark.sql.view.maxNestedViewDepth to work " +
601615
"aroud this.")
602616
}
603-
execute(child)
617+
executeSameContext(child)
604618
}
605619
view.copy(child = newChild)
606620
case p @ SubqueryAlias(_, view: View) =>
@@ -1269,7 +1283,7 @@ class Analyzer(
12691283
do {
12701284
// Try to resolve the subquery plan using the regular analyzer.
12711285
previous = current
1272-
current = execute(current)
1286+
current = executeSameContext(current)
12731287

12741288
// Use the outer references to resolve the subquery plan if it isn't resolved yet.
12751289
val i = plans.iterator
@@ -1392,7 +1406,7 @@ class Analyzer(
13921406
grouping,
13931407
Alias(cond, "havingCondition")() :: Nil,
13941408
child)
1395-
val resolvedOperator = execute(aggregatedCondition)
1409+
val resolvedOperator = executeSameContext(aggregatedCondition)
13961410
def resolvedAggregateFilter =
13971411
resolvedOperator
13981412
.asInstanceOf[Aggregate]
@@ -1450,7 +1464,8 @@ class Analyzer(
14501464
val aliasedOrdering =
14511465
unresolvedSortOrders.map(o => Alias(o.child, "aggOrder")())
14521466
val aggregatedOrdering = aggregate.copy(aggregateExpressions = aliasedOrdering)
1453-
val resolvedAggregate: Aggregate = execute(aggregatedOrdering).asInstanceOf[Aggregate]
1467+
val resolvedAggregate: Aggregate =
1468+
executeSameContext(aggregatedOrdering).asInstanceOf[Aggregate]
14541469
val resolvedAliasedOrdering: Seq[Alias] =
14551470
resolvedAggregate.aggregateExpressions.asInstanceOf[Seq[Alias]]
14561471

0 commit comments

Comments
 (0)