Skip to content

Commit 9c0fc55

Browse files
committed
resolve outer references and normal columns in the same analyzer batch
1 parent 5b13a51 commit 9c0fc55

File tree

1 file changed

+57
-49
lines changed
  • sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis

1 file changed

+57
-49
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 57 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,7 @@ class Analyzer(override val catalogManager: CatalogManager)
298298
ResolveOrdinalInOrderByAndGroupBy ::
299299
ResolveAggAliasInGroupBy ::
300300
ResolveMissingReferences ::
301+
ResolveOuterReferences ::
301302
ExtractGenerator ::
302303
ResolveGenerate ::
303304
ResolveFunctions ::
@@ -2109,6 +2110,51 @@ class Analyzer(override val catalogManager: CatalogManager)
21092110
}
21102111
}
21112112

2113+
/**
2114+
* Resolves `UnresolvedAttribute` to `OuterReference` if we are resolving subquery plans (when
2115+
* `AnalysisContext.get.outerPlan` is set).
2116+
*/
2117+
object ResolveOuterReferences extends Rule[LogicalPlan] {
2118+
override def apply(plan: LogicalPlan): LogicalPlan = {
2119+
// Only apply this rule if we are resolving subquery plans.
2120+
if (AnalysisContext.get.outerPlan.isEmpty) return plan
2121+
2122+
// We must run these 3 rules first, as they also resolve `UnresolvedAttribute` and have
2123+
// higher priority than outer reference resolution.
2124+
val prepared = ResolveAggregateFunctions(ResolveMissingReferences(ResolveReferences(plan)))
2125+
prepared.resolveOperatorsDownWithPruning(_.containsPattern(UNRESOLVED_ATTRIBUTE)) {
2126+
// Handle `Generate` specially here, because `Generate.generatorOutput` starts with
2127+
// `UnresolvedAttribute` but we should never resolve it to outer references. It's a bit
2128+
// hacky that `Generate` uses `UnresolvedAttribute` to store the generator column names,
2129+
// we should clean it up later.
2130+
case g: Generate if g.childrenResolved && !g.resolved =>
2131+
val newGenerator = g.generator.transformWithPruning(
2132+
_.containsPattern(UNRESOLVED_ATTRIBUTE))(resolveOuterReference)
2133+
val resolved = g.copy(generator = newGenerator.asInstanceOf[Generator])
2134+
resolved.copyTagsFrom(g)
2135+
resolved
2136+
case q: LogicalPlan if q.childrenResolved && !q.resolved =>
2137+
q.transformExpressionsWithPruning(
2138+
_.containsPattern(UNRESOLVED_ATTRIBUTE))(resolveOuterReference)
2139+
}
2140+
}
2141+
2142+
private val resolveOuterReference: PartialFunction[Expression, Expression] = {
2143+
case u @ UnresolvedAttribute(nameParts) => withPosition(u) {
2144+
try {
2145+
AnalysisContext.get.outerPlan.get.resolveChildren(nameParts, resolver) match {
2146+
case Some(resolved) => wrapOuterReference(resolved)
2147+
case None => u
2148+
}
2149+
} catch {
2150+
case ae: AnalysisException =>
2151+
logDebug(ae.getMessage)
2152+
u
2153+
}
2154+
}
2155+
}
2156+
}
2157+
21122158
/**
21132159
* Checks whether a function identifier referenced by an [[UnresolvedFunction]] is defined in the
21142160
* function registry. Note that this rule doesn't try to resolve the [[UnresolvedFunction]]. It
@@ -2482,65 +2528,27 @@ class Analyzer(override val catalogManager: CatalogManager)
24822528
*/
24832529
object ResolveSubquery extends Rule[LogicalPlan] {
24842530
/**
2485-
* Resolve the correlated expressions in a subquery, as if the expressions live in the outer
2486-
* plan. All resolved outer references are wrapped in an [[OuterReference]]
2487-
*/
2488-
private def resolveOuterReferences(plan: LogicalPlan, outer: LogicalPlan): LogicalPlan = {
2489-
plan.resolveOperatorsDownWithPruning(_.containsPattern(UNRESOLVED_ATTRIBUTE)) {
2490-
case q: LogicalPlan if q.childrenResolved && !q.resolved =>
2491-
q.transformExpressionsWithPruning(_.containsPattern(UNRESOLVED_ATTRIBUTE)) {
2492-
case u @ UnresolvedAttribute(nameParts) =>
2493-
withPosition(u) {
2494-
try {
2495-
outer.resolveChildren(nameParts, resolver) match {
2496-
case Some(outerAttr) => wrapOuterReference(outerAttr)
2497-
case None => u
2498-
}
2499-
} catch {
2500-
case ae: AnalysisException =>
2501-
logDebug(ae.getMessage)
2502-
u
2503-
}
2504-
}
2505-
}
2506-
}
2507-
}
2508-
2509-
/**
2510-
* Resolves the subquery plan that is referenced in a subquery expression. The normal
2511-
* attribute references are resolved using regular analyzer and the outer references are
2512-
* resolved from the outer plans using the resolveOuterReferences method.
2531+
* Resolves the subquery plan that is referenced in a subquery expression, by invoking the
2532+
* entire analyzer recursively. We set outer plan in `AnalysisContext`, so that the analyzer
2533+
* can resolve outer references.
25132534
*
2514-
* Outer references from the correlated predicates are updated as children of
2515-
* Subquery expression.
2535+
* Outer references of the subquery are updated as children of Subquery expression.
25162536
*/
25172537
private def resolveSubQuery(
25182538
e: SubqueryExpression,
25192539
outer: LogicalPlan)(
25202540
f: (LogicalPlan, Seq[Expression]) => SubqueryExpression): SubqueryExpression = {
2521-
// Step 1: Resolve the outer expressions.
2522-
var previous: LogicalPlan = null
2523-
var current = e.plan
2524-
do {
2525-
// Try to resolve the subquery plan using the regular analyzer.
2526-
previous = current
2527-
current = AnalysisContext.withOuterPlan(outer) {
2528-
executeSameContext(current)
2529-
}
2530-
2531-
// Use the outer references to resolve the subquery plan if it isn't resolved yet.
2532-
if (!current.resolved) {
2533-
current = resolveOuterReferences(current, outer)
2534-
}
2535-
} while (!current.resolved && !current.fastEquals(previous))
2541+
val newSubqueryPlan = AnalysisContext.withOuterPlan(outer) {
2542+
executeSameContext(e.plan)
2543+
}
25362544

2537-
// Step 2: If the subquery plan is fully resolved, pull the outer references and record
2545+
// If the subquery plan is fully resolved, pull the outer references and record
25382546
// them as children of SubqueryExpression.
2539-
if (current.resolved) {
2547+
if (newSubqueryPlan.resolved) {
25402548
// Record the outer references as children of subquery expression.
2541-
f(current, SubExprUtils.getOuterReferences(current))
2549+
f(newSubqueryPlan, SubExprUtils.getOuterReferences(newSubqueryPlan))
25422550
} else {
2543-
e.withNewPlan(current)
2551+
e.withNewPlan(newSubqueryPlan)
25442552
}
25452553
}
25462554

0 commit comments

Comments
 (0)