Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ object SimpleAnalyzer extends Analyzer(
/**
* Provides a way to keep state during the analysis, this enables us to decouple the concerns
* of analysis environment from the catalog.
* The state that is kept here is per-query.
*
* Note this is thread local.
*
Expand All @@ -70,6 +71,8 @@ object AnalysisContext {
}

def get: AnalysisContext = value.get()
def reset(): Unit = value.remove()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will be resolved by the future PR.


private def set(context: AnalysisContext): Unit = value.set(context)

def withAnalysisContext[A](database: Option[String])(f: => A): A = {
Expand All @@ -95,6 +98,17 @@ class Analyzer(
this(catalog, conf, conf.optimizerMaxIterations)
}

override def execute(plan: LogicalPlan): LogicalPlan = {
AnalysisContext.reset()
try {
executeSameContext(plan)
} finally {
AnalysisContext.reset()
}
}

private def executeSameContext(plan: LogicalPlan): LogicalPlan = super.execute(plan)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

executeWithSameContext?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1


def resolver: Resolver = conf.resolver

protected val fixedPoint = FixedPoint(maxIterations)
Expand Down Expand Up @@ -176,7 +190,7 @@ class Analyzer(
case With(child, relations) =>
substituteCTE(child, relations.foldLeft(Seq.empty[(String, LogicalPlan)]) {
case (resolved, (name, relation)) =>
resolved :+ name -> execute(substituteCTE(relation, resolved))
resolved :+ name -> executeSameContext(substituteCTE(relation, resolved))
})
case other => other
}
Expand Down Expand Up @@ -600,7 +614,7 @@ class Analyzer(
"avoid errors. Increase the value of spark.sql.view.maxNestedViewDepth to work " +
"aroud this.")
}
execute(child)
executeSameContext(child)
}
view.copy(child = newChild)
case p @ SubqueryAlias(_, view: View) =>
Expand Down Expand Up @@ -1269,7 +1283,7 @@ class Analyzer(
do {
// Try to resolve the subquery plan using the regular analyzer.
previous = current
current = execute(current)
current = executeSameContext(current)

// Use the outer references to resolve the subquery plan if it isn't resolved yet.
val i = plans.iterator
Expand Down Expand Up @@ -1392,7 +1406,7 @@ class Analyzer(
grouping,
Alias(cond, "havingCondition")() :: Nil,
child)
val resolvedOperator = execute(aggregatedCondition)
val resolvedOperator = executeSameContext(aggregatedCondition)
def resolvedAggregateFilter =
resolvedOperator
.asInstanceOf[Aggregate]
Expand Down Expand Up @@ -1450,7 +1464,8 @@ class Analyzer(
val aliasedOrdering =
unresolvedSortOrders.map(o => Alias(o.child, "aggOrder")())
val aggregatedOrdering = aggregate.copy(aggregateExpressions = aliasedOrdering)
val resolvedAggregate: Aggregate = execute(aggregatedOrdering).asInstanceOf[Aggregate]
val resolvedAggregate: Aggregate =
executeSameContext(aggregatedOrdering).asInstanceOf[Aggregate]
val resolvedAliasedOrdering: Seq[Alias] =
resolvedAggregate.aggregateExpressions.asInstanceOf[Seq[Alias]]

Expand Down