Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ class Analyzer(override val catalogManager: CatalogManager)
ResolveOrdinalInOrderByAndGroupBy ::
ResolveAggAliasInGroupBy ::
ResolveMissingReferences ::
ResolveOuterReferences ::
ExtractGenerator ::
ResolveGenerate ::
ResolveFunctions ::
Expand Down Expand Up @@ -2109,6 +2110,51 @@ class Analyzer(override val catalogManager: CatalogManager)
}
}

/**
* Resolves `UnresolvedAttribute` to `OuterReference` if we are resolving subquery plans (when
* `AnalysisContext.get.outerPlan` is set).
*/
object ResolveOuterReferences extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = {
// Only apply this rule if we are resolving subquery plans.
if (AnalysisContext.get.outerPlan.isEmpty) return plan

// We must run these 3 rules first, as they also resolve `UnresolvedAttribute` and have
// higher priority than outer reference resolution.
val prepared = ResolveAggregateFunctions(ResolveMissingReferences(ResolveReferences(plan)))
Copy link
Member

@viirya viirya Dec 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aren't ResolveMissingReferences and ResolveReferences already run before ResolveOuterReferences? Will we just run this rule separately?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The key is to run ResolveOuterReferences right after these 3 rules, so that it's safe to resolve UnresolvedAttribute to outer references. Otherwise, other rules may change the plan shape and make these 3 rules applicable again.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess one disadvantage of running these three rules inside ResolveOuterReferences is that they are not visible in the plan change log.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also it's a bit strange when there is no outer reference to resolve but the 3 rules take effect, the plan change logger of this ResolveOuterReferences actually shows the changes from these 3 rules.

=== Applying Rule org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveOuterReferences ===
 'Project [id2#28L]                                                                                          'Project [id2#28L]
 +- 'SubqueryAlias __auto_generated_subquery_name                                                            +- 'SubqueryAlias __auto_generated_subquery_name
!   +- 'Project [id#27, ('id + cast(1 as bigint)) AS id2#28]                                                    +- 'Project [id#27, (id#27 + cast(1 as bigint)) AS id2#28]
..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is not a perfect solution, but AFAIK this is the only reliable way to guarantee rule execution order. The best solution in my opinion is to centralize all column resolution code in one rule, but that's a much larger change.

prepared.resolveOperatorsDownWithPruning(_.containsPattern(UNRESOLVED_ATTRIBUTE)) {
// Handle `Generate` specially here, because `Generate.generatorOutput` starts with
// `UnresolvedAttribute` but we should never resolve it to outer references. It's a bit
// hacky that `Generate` uses `UnresolvedAttribute` to store the generator column names,
// we should clean it up later.
case g: Generate if g.childrenResolved && !g.resolved =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a unit test for this case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

existing tests failed without this change.

val newGenerator = g.generator.transformWithPruning(
_.containsPattern(UNRESOLVED_ATTRIBUTE))(resolveOuterReference)
val resolved = g.copy(generator = newGenerator.asInstanceOf[Generator])
resolved.copyTagsFrom(g)
resolved
case q: LogicalPlan if q.childrenResolved && !q.resolved =>
q.transformExpressionsWithPruning(
_.containsPattern(UNRESOLVED_ATTRIBUTE))(resolveOuterReference)
}
}

private val resolveOuterReference: PartialFunction[Expression, Expression] = {
case u @ UnresolvedAttribute(nameParts) => withPosition(u) {
try {
AnalysisContext.get.outerPlan.get.resolveChildren(nameParts, resolver) match {
case Some(resolved) => wrapOuterReference(resolved)
case None => u
}
} catch {
case ae: AnalysisException =>
logDebug(ae.getMessage)
u
}
}
}
}

/**
* Checks whether a function identifier referenced by an [[UnresolvedFunction]] is defined in the
* function registry. Note that this rule doesn't try to resolve the [[UnresolvedFunction]]. It
Expand Down Expand Up @@ -2482,65 +2528,27 @@ class Analyzer(override val catalogManager: CatalogManager)
*/
object ResolveSubquery extends Rule[LogicalPlan] {
/**
* Resolve the correlated expressions in a subquery, as if the expressions live in the outer
* plan. All resolved outer references are wrapped in an [[OuterReference]]
*/
private def resolveOuterReferences(plan: LogicalPlan, outer: LogicalPlan): LogicalPlan = {
plan.resolveOperatorsDownWithPruning(_.containsPattern(UNRESOLVED_ATTRIBUTE)) {
case q: LogicalPlan if q.childrenResolved && !q.resolved =>
q.transformExpressionsWithPruning(_.containsPattern(UNRESOLVED_ATTRIBUTE)) {
case u @ UnresolvedAttribute(nameParts) =>
withPosition(u) {
try {
outer.resolveChildren(nameParts, resolver) match {
case Some(outerAttr) => wrapOuterReference(outerAttr)
case None => u
}
} catch {
case ae: AnalysisException =>
logDebug(ae.getMessage)
u
}
}
}
}
}

/**
* Resolves the subquery plan that is referenced in a subquery expression. The normal
* attribute references are resolved using regular analyzer and the outer references are
* resolved from the outer plans using the resolveOuterReferences method.
* Resolves the subquery plan that is referenced in a subquery expression, by invoking the
* entire analyzer recursively. We set outer plan in `AnalysisContext`, so that the analyzer
* can resolve outer references.
*
* Outer references from the correlated predicates are updated as children of
* Subquery expression.
* Outer references of the subquery are updated as children of Subquery expression.
*/
private def resolveSubQuery(
e: SubqueryExpression,
outer: LogicalPlan)(
f: (LogicalPlan, Seq[Expression]) => SubqueryExpression): SubqueryExpression = {
// Step 1: Resolve the outer expressions.
var previous: LogicalPlan = null
var current = e.plan
do {
// Try to resolve the subquery plan using the regular analyzer.
previous = current
current = AnalysisContext.withOuterPlan(outer) {
executeSameContext(current)
}

// Use the outer references to resolve the subquery plan if it isn't resolved yet.
if (!current.resolved) {
current = resolveOuterReferences(current, outer)
}
} while (!current.resolved && !current.fastEquals(previous))
val newSubqueryPlan = AnalysisContext.withOuterPlan(outer) {
executeSameContext(e.plan)
}

// Step 2: If the subquery plan is fully resolved, pull the outer references and record
// If the subquery plan is fully resolved, pull the outer references and record
// them as children of SubqueryExpression.
if (current.resolved) {
if (newSubqueryPlan.resolved) {
// Record the outer references as children of subquery expression.
f(current, SubExprUtils.getOuterReferences(current))
f(newSubqueryPlan, SubExprUtils.getOuterReferences(newSubqueryPlan))
} else {
e.withNewPlan(current)
e.withNewPlan(newSubqueryPlan)
}
}

Expand Down