-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-24163][SPARK-24164][SQL] Support column list as the pivot column in Pivot #21720
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -509,17 +509,39 @@ class Analyzer( | |
| def apply(plan: LogicalPlan): LogicalPlan = plan transform { | ||
| case p: Pivot if !p.childrenResolved || !p.aggregates.forall(_.resolved) | ||
| || (p.groupByExprsOpt.isDefined && !p.groupByExprsOpt.get.forall(_.resolved)) | ||
| || !p.pivotColumn.resolved => p | ||
| || !p.pivotColumn.resolved || !p.pivotValues.forall(_.resolved) => p | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. By which test is the change covered?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Before this PR, pivot values can only be single literals (no struct) so they have been converted to Literals in ASTBuilder. Now they are "expressions" and will be handled in this Analyzer rule. |
||
| case Pivot(groupByExprsOpt, pivotColumn, pivotValues, aggregates, child) => | ||
| // Check all aggregate expressions. | ||
| aggregates.foreach(checkValidAggregateExpression) | ||
| // Check all pivot values are literal and match pivot column data type. | ||
| val evalPivotValues = pivotValues.map { value => | ||
| val foldable = value match { | ||
| case Alias(v, _) => v.foldable | ||
| case _ => value.foldable | ||
| } | ||
| if (!foldable) { | ||
| throw new AnalysisException( | ||
| s"Literal expressions required for pivot values, found '$value'") | ||
| } | ||
| if (!Cast.canCast(value.dataType, pivotColumn.dataType)) { | ||
| throw new AnalysisException(s"Invalid pivot value '$value': " + | ||
| s"value data type ${value.dataType.simpleString} does not match " + | ||
| s"pivot column data type ${pivotColumn.dataType.catalogString}") | ||
| } | ||
| Cast(value, pivotColumn.dataType, Some(conf.sessionLocalTimeZone)).eval(EmptyRow) | ||
| } | ||
| // Group-by expressions coming from SQL are implicit and need to be deduced. | ||
| val groupByExprs = groupByExprsOpt.getOrElse( | ||
| (child.outputSet -- aggregates.flatMap(_.references) -- pivotColumn.references).toSeq) | ||
| val singleAgg = aggregates.size == 1 | ||
| def outputName(value: Literal, aggregate: Expression): String = { | ||
| val utf8Value = Cast(value, StringType, Some(conf.sessionLocalTimeZone)).eval(EmptyRow) | ||
| val stringValue: String = Option(utf8Value).map(_.toString).getOrElse("null") | ||
| def outputName(value: Expression, aggregate: Expression): String = { | ||
| val stringValue = value match { | ||
| case n: NamedExpression => n.name | ||
| case _ => | ||
| val utf8Value = | ||
| Cast(value, StringType, Some(conf.sessionLocalTimeZone)).eval(EmptyRow) | ||
| Option(utf8Value).map(_.toString).getOrElse("null") | ||
| } | ||
| if (singleAgg) { | ||
| stringValue | ||
| } else { | ||
|
|
@@ -534,15 +556,10 @@ class Analyzer( | |
| // Since evaluating |pivotValues| if statements for each input row can get slow this is an | ||
| // alternate plan that instead uses two steps of aggregation. | ||
| val namedAggExps: Seq[NamedExpression] = aggregates.map(a => Alias(a, a.sql)()) | ||
| val namedPivotCol = pivotColumn match { | ||
| case n: NamedExpression => n | ||
| case _ => Alias(pivotColumn, "__pivot_col")() | ||
| } | ||
| val bigGroup = groupByExprs :+ namedPivotCol | ||
| val bigGroup = groupByExprs ++ pivotColumn.references | ||
| val firstAgg = Aggregate(bigGroup, bigGroup ++ namedAggExps, child) | ||
| val castPivotValues = pivotValues.map(Cast(_, pivotColumn.dataType).eval(EmptyRow)) | ||
| val pivotAggs = namedAggExps.map { a => | ||
| Alias(PivotFirst(namedPivotCol.toAttribute, a.toAttribute, castPivotValues) | ||
| Alias(PivotFirst(pivotColumn, a.toAttribute, evalPivotValues) | ||
| .toAggregateExpression() | ||
| , "__pivot_" + a.sql)() | ||
| } | ||
|
|
@@ -557,8 +574,12 @@ class Analyzer( | |
| Project(groupByExprsAttr ++ pivotOutputs, secondAgg) | ||
| } else { | ||
| val pivotAggregates: Seq[NamedExpression] = pivotValues.flatMap { value => | ||
| def ifExpr(expr: Expression) = { | ||
| If(EqualNullSafe(pivotColumn, value), expr, Literal(null)) | ||
| def ifExpr(e: Expression) = { | ||
| If( | ||
| EqualNullSafe( | ||
| pivotColumn, | ||
| Cast(value, pivotColumn.dataType, Some(conf.sessionLocalTimeZone))), | ||
| e, Literal(null)) | ||
| } | ||
| aggregates.map { aggregate => | ||
| val filteredAggregate = aggregate.transformDown { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -630,11 +630,29 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging | |
| val aggregates = Option(ctx.aggregates).toSeq | ||
| .flatMap(_.namedExpression.asScala) | ||
| .map(typedVisit[Expression]) | ||
| val pivotColumn = UnresolvedAttribute.quoted(ctx.pivotColumn.getText) | ||
| val pivotValues = ctx.pivotValues.asScala.map(typedVisit[Expression]).map(Literal.apply) | ||
| val pivotColumn = if (ctx.pivotColumn.identifiers.size == 1) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are there any reasons to handle one pivot column separately? And what happens if
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Cannot be "0" as required by the parser rule. if size == 1, then it's single column as before, otherwise it's a construct. |
||
| UnresolvedAttribute.quoted(ctx.pivotColumn.identifier.getText) | ||
| } else { | ||
| CreateStruct( | ||
| ctx.pivotColumn.identifiers.asScala.map( | ||
| identifier => UnresolvedAttribute.quoted(identifier.getText))) | ||
| } | ||
| val pivotValues = ctx.pivotValues.asScala.map(visitPivotValue) | ||
| Pivot(None, pivotColumn, pivotValues, aggregates, query) | ||
| } | ||
|
|
||
| /** | ||
| * Create a Pivot column value with or without an alias. | ||
| */ | ||
| override def visitPivotValue(ctx: PivotValueContext): Expression = withOrigin(ctx) { | ||
| val e = expression(ctx.expression) | ||
| if (ctx.identifier != null) { | ||
| Alias(e, ctx.identifier.getText)() | ||
| } else { | ||
| e | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Add a [[Generate]] (Lateral View) to a logical plan. | ||
| */ | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -700,7 +700,7 @@ case class GroupingSets( | |
| case class Pivot( | ||
| groupByExprsOpt: Option[Seq[NamedExpression]], | ||
| pivotColumn: Expression, | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am asking just for my understanding. If you support multiple pivot columns, why it is not declared here explicitly:
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No. Pivot column is one "expression" which can be either 1) a single column reference or 2) a struct of multiple columns. Either way the list of pivot values are many-to-one mapping for the pivot column. |
||
| pivotValues: Seq[Literal], | ||
| pivotValues: Seq[Expression], | ||
| aggregates: Seq[Expression], | ||
| child: LogicalPlan) extends UnaryNode { | ||
| override lazy val resolved = false // Pivot will be replaced after being resolved. | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are there any specific reasons to restrict the
pivotColumnbyidentifier? Are there any cases when expressions still don't supported properly with your changes?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main reason was that I implemented this pivot SQL support based on ORACLE grammar. Please take a look at https://docs.oracle.com/database/121/SQLRF/img_text/pivot_for_clause.htm. Note that the "column" here is different from "expression" (take this for reference: https://docs.oracle.com/cd/B28359_01/server.111/b28286/expressions002.htm#SQLRF52047).
Another reason was that relaxing it to an "expr" would require a lot more tests and handling of special cases.