Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2983,20 +2983,6 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
}
}

// We must wait until all expressions except for generator functions are resolved before
// rewriting generator functions in Project/Aggregate. This is necessary to make this rule
// stable for different execution orders of analyzer rules. See also SPARK-47241.
private def canRewriteGenerator(namedExprs: Seq[NamedExpression]): Boolean = {
namedExprs.forall { ne =>
ne.resolved || {
trimNonTopLevelAliases(ne) match {
case AliasedGenerator(_, _, _) => true
case _ => false
}
}
}
}

def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUpWithPruning(
_.containsPattern(GENERATOR), ruleId) {
case p @ Project(Seq(UnresolvedStarWithColumns(_, _, _)), _) =>
Expand All @@ -3015,8 +3001,11 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
val generators = aggList.filter(hasGenerator).map(trimAlias)
throw QueryCompilationErrors.moreThanOneGeneratorError(generators)

case Aggregate(groupList, aggList, child, _) if canRewriteGenerator(aggList) &&
aggList.exists(hasGenerator) =>
case Aggregate(groupList, aggList, child, _) if
aggList.forall {
case AliasedGenerator(_, _, _) => true
case other => other.resolved
} && aggList.exists(hasGenerator) =>
// If generator in the aggregate list was visited, set the boolean flag true.
var generatorVisited = false

Expand Down Expand Up @@ -3061,8 +3050,8 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
// first for replacing `Project` with `Aggregate`.
p

case p @ Project(projectList, child) if canRewriteGenerator(projectList) &&
projectList.exists(hasGenerator) =>
// The star will be expanded differently if we insert `Generate` under `Project` too early.
case p @ Project(projectList, child) if !projectList.exists(_.exists(_.isInstanceOf[Star])) =>
val (resolvedGenerator, newProjectList) = projectList
.map(trimNonTopLevelAliases)
.foldLeft((None: Option[Generate], Nil: Seq[NamedExpression])) { (res, e) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,14 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase {
// Lateral column alias does not have qualifiers. We always use the first name part to
// look up lateral column aliases.
val lowerCasedName = u.nameParts.head.toLowerCase(Locale.ROOT)
aliasMap.get(lowerCasedName).map {
aliasMap.get(lowerCasedName).filter {
// Do not resolve LCA with aliased `Generator`, as it will be rewritten by the rule
// `ExtractGenerator` with fresh output attribute IDs. The `Generator` will be pulled
// out and put in a `Generate` node below `Project`, so that we can resolve the column
// normally without LCA resolution.
case scala.util.Left(alias) => !alias.child.isInstanceOf[Generator]
case _ => true
}.map {
case scala.util.Left(alias) =>
if (alias.resolved) {
val resolvedAttr = resolveExpressionByPlanOutput(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1366,40 +1366,21 @@ class LateralColumnAliasSuite extends LateralColumnAliasSuiteBase {
sql("select 1 as a, a").queryExecution.assertAnalyzed()
}

test("SPARK-49349: Improve error message for LCA with Generate") {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test now passes so refactoring it a bit

checkError(
exception = intercept[AnalysisException] {
sql(
s"""
|SELECT
| explode(split(name , ',')) AS new_name,
| new_name like 'a%'
|FROM $testTable
|""".stripMargin)
},
condition = "UNSUPPORTED_FEATURE.LATERAL_COLUMN_ALIAS_IN_GENERATOR",
sqlState = "0A000",
parameters = Map(
"lca" -> "`new_name`",
"generatorExpr" -> "\"unresolvedalias(lateralAliasReference(new_name) LIKE a%)\""))

checkError(
exception = intercept[AnalysisException] {
sql(
s"""
|SELECT
| explode_outer(from_json(name,'array<struct<values:string>>')) as newName,
| size(from_json(newName.values,'array<string>')) +
| size(array(from_json(newName.values,'map<string,string>'))) as size
|FROM $testTable
|""".stripMargin)
},
condition = "UNSUPPORTED_FEATURE.LATERAL_COLUMN_ALIAS_IN_GENERATOR",
sqlState = "0A000",
parameters = Map(
"lca" -> "`newName.values`",
"generatorExpr" -> ("\"(size(from_json(lateralAliasReference(newName.values), " +
"array<string>)) + size(array(from_json(lateralAliasReference(newName.values), " +
"map<string,string>)))) AS size\"")))
test("LateralColumnAlias with Generate") {
checkAnswer(
sql("WITH cte AS (SELECT EXPLODE(ARRAY(1, 2, 3)) AS c1, c1) SELECT * FROM cte"),
Row(1, 1) :: Row(2, 2) :: Row(3, 3) :: Nil
)
checkAnswer(
sql(
s"""
|SELECT
| explode(split(name , ',')) AS new_name,
| new_name like 'a%'
|FROM $testTable
|""".stripMargin),
Row("alex", true) :: Row("amy", true) :: Row("cathy", false) ::
Row("david", false) :: Row("jen", false) :: Nil
)
}
}