diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 1daa8ea36bf3..7f66ddaa8942 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -298,6 +298,7 @@ class Analyzer(override val catalogManager: CatalogManager) ResolveOrdinalInOrderByAndGroupBy :: ResolveAggAliasInGroupBy :: ResolveMissingReferences :: + ResolveOuterReferences :: ExtractGenerator :: ResolveGenerate :: ResolveFunctions :: @@ -2109,6 +2110,51 @@ class Analyzer(override val catalogManager: CatalogManager) } } + /** + * Resolves `UnresolvedAttribute` to `OuterReference` if we are resolving subquery plans (when + * `AnalysisContext.get.outerPlan` is set). + */ + object ResolveOuterReferences extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = { + // Only apply this rule if we are resolving subquery plans. + if (AnalysisContext.get.outerPlan.isEmpty) return plan + + // We must run these 3 rules first, as they also resolve `UnresolvedAttribute` and have + // higher priority than outer reference resolution. + val prepared = ResolveAggregateFunctions(ResolveMissingReferences(ResolveReferences(plan))) + prepared.resolveOperatorsDownWithPruning(_.containsPattern(UNRESOLVED_ATTRIBUTE)) { + // Handle `Generate` specially here, because `Generate.generatorOutput` starts with + // `UnresolvedAttribute` but we should never resolve it to outer references. It's a bit + // hacky that `Generate` uses `UnresolvedAttribute` to store the generator column names, + // we should clean it up later. + case g: Generate if g.childrenResolved && !g.resolved => + val newGenerator = g.generator.transformWithPruning( + _.containsPattern(UNRESOLVED_ATTRIBUTE))(resolveOuterReference) + val resolved = g.copy(generator = newGenerator.asInstanceOf[Generator]) + resolved.copyTagsFrom(g) + resolved + case q: LogicalPlan if q.childrenResolved && !q.resolved => + q.transformExpressionsWithPruning( + _.containsPattern(UNRESOLVED_ATTRIBUTE))(resolveOuterReference) + } + } + + private val resolveOuterReference: PartialFunction[Expression, Expression] = { + case u @ UnresolvedAttribute(nameParts) => withPosition(u) { + try { + AnalysisContext.get.outerPlan.get.resolveChildren(nameParts, resolver) match { + case Some(resolved) => wrapOuterReference(resolved) + case None => u + } + } catch { + case ae: AnalysisException => + logDebug(ae.getMessage) + u + } + } + } + } + /** * Checks whether a function identifier referenced by an [[UnresolvedFunction]] is defined in the * function registry. Note that this rule doesn't try to resolve the [[UnresolvedFunction]]. It @@ -2482,65 +2528,27 @@ class Analyzer(override val catalogManager: CatalogManager) */ object ResolveSubquery extends Rule[LogicalPlan] { /** - * Resolve the correlated expressions in a subquery, as if the expressions live in the outer - * plan. All resolved outer references are wrapped in an [[OuterReference]] - */ - private def resolveOuterReferences(plan: LogicalPlan, outer: LogicalPlan): LogicalPlan = { - plan.resolveOperatorsDownWithPruning(_.containsPattern(UNRESOLVED_ATTRIBUTE)) { - case q: LogicalPlan if q.childrenResolved && !q.resolved => - q.transformExpressionsWithPruning(_.containsPattern(UNRESOLVED_ATTRIBUTE)) { - case u @ UnresolvedAttribute(nameParts) => - withPosition(u) { - try { - outer.resolveChildren(nameParts, resolver) match { - case Some(outerAttr) => wrapOuterReference(outerAttr) - case None => u - } - } catch { - case ae: AnalysisException => - logDebug(ae.getMessage) - u - } - } - } - } - } - - /** - * Resolves the subquery plan that is referenced in a subquery expression. The normal - * attribute references are resolved using regular analyzer and the outer references are - * resolved from the outer plans using the resolveOuterReferences method. + * Resolves the subquery plan that is referenced in a subquery expression, by invoking the + * entire analyzer recursively. We set outer plan in `AnalysisContext`, so that the analyzer + * can resolve outer references. * - * Outer references from the correlated predicates are updated as children of - * Subquery expression. + * Outer references of the subquery are updated as children of Subquery expression. */ private def resolveSubQuery( e: SubqueryExpression, outer: LogicalPlan)( f: (LogicalPlan, Seq[Expression]) => SubqueryExpression): SubqueryExpression = { - // Step 1: Resolve the outer expressions. - var previous: LogicalPlan = null - var current = e.plan - do { - // Try to resolve the subquery plan using the regular analyzer. - previous = current - current = AnalysisContext.withOuterPlan(outer) { - executeSameContext(current) - } - - // Use the outer references to resolve the subquery plan if it isn't resolved yet. - if (!current.resolved) { - current = resolveOuterReferences(current, outer) - } - } while (!current.resolved && !current.fastEquals(previous)) + val newSubqueryPlan = AnalysisContext.withOuterPlan(outer) { + executeSameContext(e.plan) + } - // Step 2: If the subquery plan is fully resolved, pull the outer references and record + // If the subquery plan is fully resolved, pull the outer references and record // them as children of SubqueryExpression. - if (current.resolved) { + if (newSubqueryPlan.resolved) { // Record the outer references as children of subquery expression. - f(current, SubExprUtils.getOuterReferences(current)) + f(newSubqueryPlan, SubExprUtils.getOuterReferences(newSubqueryPlan)) } else { - e.withNewPlan(current) + e.withNewPlan(newSubqueryPlan) } }