Skip to content

Commit b99d0c7

Browse files
committed
Address comments and fix bug.
1 parent 97837a4 commit b99d0c7

File tree

3 files changed

+9
-9
lines changed

3 files changed

+9
-9
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1125,7 +1125,8 @@ class Analyzer(
11251125
case sa @ Sort(_, _, AnalysisBarrier(child: Aggregate)) => sa
11261126
case sa @ Sort(_, _, child: Aggregate) => sa
11271127

1128-
case s @ Sort(order, _, child) if !s.resolved && child.resolved =>
1128+
case s @ Sort(order, _, child)
1129+
if (!s.resolved || s.missingInput.nonEmpty) && child.resolved =>
11291130
val (newOrder, newChild) = resolveExprsAndAddMissingAttrs(order, child)
11301131
val ordering = newOrder.map(_.asInstanceOf[SortOrder])
11311132
if (child.output == newChild.output) {
@@ -1136,7 +1137,7 @@ class Analyzer(
11361137
Project(child.output, newSort)
11371138
}
11381139

1139-
case f @ Filter(cond, child) if !f.resolved && child.resolved =>
1140+
case f @ Filter(cond, child) if (!f.resolved || f.missingInput.nonEmpty) && child.resolved =>
11401141
val (newCond, newChild) = resolveExprsAndAddMissingAttrs(Seq(cond), child)
11411142
if (child.output == newChild.output) {
11421143
f.copy(condition = newCond.head)
@@ -1149,8 +1150,9 @@ class Analyzer(
11491150

11501151
private def resolveExprsAndAddMissingAttrs(
11511152
exprs: Seq[Expression], plan: LogicalPlan): (Seq[Expression], LogicalPlan) = {
1153+
// An expression is possibly resolved but not in the output of `plan`.
11521154
if (exprs.forall(e => e.resolved && e.references.subsetOf(plan.outputSet))) {
1153-
// All given expressions are resolved, no need to continue anymore.
1155+
// All given expressions are resolved and in the plan's output, no need to continue anymore.
11541156
(exprs, plan)
11551157
} else {
11561158
plan match {
@@ -1163,8 +1165,8 @@ class Analyzer(
11631165
case p: Project =>
11641166
val maybeResolvedExprs = exprs.map(resolveExpression(_, p))
11651167
val (newExprs, newChild) = resolveExprsAndAddMissingAttrs(maybeResolvedExprs, p.child)
1166-
val missingAttrs = AttributeSet(newExprs) --
1167-
AttributeSet(maybeResolvedExprs.filter(_.references.subsetOf(p.outputSet)))
1168+
// The resolved attributes might not come from `p.child`. Need to filter it.
1169+
val missingAttrs = (AttributeSet(newExprs).intersect(p.child.outputSet)) -- p.outputSet
11681170
(newExprs, Project(p.projectList ++ missingAttrs, newChild))
11691171

11701172
case a @ Aggregate(groupExprs, aggExprs, child) =>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,7 @@ abstract class LogicalPlan
6060
* [[org.apache.spark.sql.catalyst.analysis.UnresolvedRelation UnresolvedRelation]]
6161
* should return `false`).
6262
*/
63-
lazy val resolved: Boolean = expressions.forall(_.resolved) && childrenResolved &&
64-
missingInput.isEmpty
63+
lazy val resolved: Boolean = expressions.forall(_.resolved) && childrenResolved
6564

6665
override protected def statePrefix = if (!resolved) "'" else super.statePrefix
6766

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,7 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalPlan)
6060
}.nonEmpty
6161
)
6262

63-
!expressions.exists(!_.resolved) && childrenResolved && !hasSpecialExpressions &&
64-
missingInput.isEmpty
63+
!expressions.exists(!_.resolved) && childrenResolved && !hasSpecialExpressions
6564
}
6665

6766
override def validConstraints: Set[Expression] =

0 commit comments

Comments
 (0)