-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-8641][SQL] Native Spark Window functions #9819
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
c70a223
3fef5b4
b66ef4d
ada52b6
9918e78
6ebee15
b3f5a39
e95c42e
a43543e
20f5088
009280c
439b37b
31c6fb3
d7f13a0
b8ff6fb
5fefcf0
ba925fe
b4d9ca9
c181c8b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -77,6 +77,8 @@ class Analyzer( | |
| ResolveGenerate :: | ||
| ResolveFunctions :: | ||
| ResolveAliases :: | ||
| ResolveWindowOrder :: | ||
| ResolveWindowFrame :: | ||
| ExtractWindowExpressions :: | ||
| GlobalAggregates :: | ||
| ResolveAggregateFunctions :: | ||
|
|
@@ -127,14 +129,12 @@ class Analyzer( | |
| // Lookup WindowSpecDefinitions. This rule works with unresolved children. | ||
| case WithWindowDefinition(windowDefinitions, child) => | ||
| child.transform { | ||
| case plan => plan.transformExpressions { | ||
| case p => p.transformExpressions { | ||
| case UnresolvedWindowExpression(c, WindowSpecReference(windowName)) => | ||
| val errorMessage = | ||
| s"Window specification $windowName is not defined in the WINDOW clause." | ||
| val windowSpecDefinition = | ||
| windowDefinitions | ||
| .get(windowName) | ||
| .getOrElse(failAnalysis(errorMessage)) | ||
| windowDefinitions.getOrElse(windowName, failAnalysis(errorMessage)) | ||
| WindowExpression(c, windowSpecDefinition) | ||
| } | ||
| } | ||
|
|
@@ -572,6 +572,10 @@ class Analyzer( | |
| AggregateExpression(max, Complete, isDistinct = false) | ||
| case min: Min if isDistinct => | ||
| AggregateExpression(min, Complete, isDistinct = false) | ||
| // AggregateWindowFunctions are AggregateFunctions that can only be evaluated within | ||
| // the context of a Window clause. They do not need to be wrapped in an | ||
| // AggregateExpression. | ||
| case wf: AggregateWindowFunction => wf | ||
| // We get an aggregate function, we need to wrap it in an AggregateExpression. | ||
| case agg: AggregateFunction => AggregateExpression(agg, Complete, isDistinct) | ||
| // This function is not an aggregate function, just return the resolved one. | ||
|
|
@@ -592,11 +596,17 @@ class Analyzer( | |
| } | ||
|
|
||
| def containsAggregates(exprs: Seq[Expression]): Boolean = { | ||
| exprs.foreach(_.foreach { | ||
| case agg: AggregateExpression => return true | ||
| case _ => | ||
| }) | ||
| false | ||
| // Collect all Windowed Aggregate Expressions. | ||
| val windowedAggExprs = exprs.flatMap { expr => | ||
| expr.collect { | ||
| case WindowExpression(ae: AggregateExpression, _) => ae | ||
| } | ||
| }.toSet | ||
|
|
||
| // Find the first Aggregate Expression that is not Windowed. | ||
| exprs.exists(_.collectFirst { | ||
| case ae: AggregateExpression if !windowedAggExprs.contains(ae) => ae | ||
| }.isDefined) | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -870,26 +880,37 @@ class Analyzer( | |
|
|
||
| // Now, we extract regular expressions from expressionsWithWindowFunctions | ||
| // by using extractExpr. | ||
| val seenWindowAggregates = new ArrayBuffer[AggregateExpression] | ||
| val newExpressionsWithWindowFunctions = expressionsWithWindowFunctions.map { | ||
| _.transform { | ||
| // Extracts children expressions of a WindowFunction (input parameters of | ||
| // a WindowFunction). | ||
| case wf : WindowFunction => | ||
| val newChildren = wf.children.map(extractExpr(_)) | ||
| val newChildren = wf.children.map(extractExpr) | ||
| wf.withNewChildren(newChildren) | ||
|
|
||
| // Extracts expressions from the partition spec and order spec. | ||
| case wsc @ WindowSpecDefinition(partitionSpec, orderSpec, _) => | ||
| val newPartitionSpec = partitionSpec.map(extractExpr(_)) | ||
| val newPartitionSpec = partitionSpec.map(extractExpr) | ||
| val newOrderSpec = orderSpec.map { so => | ||
| val newChild = extractExpr(so.child) | ||
| so.copy(child = newChild) | ||
| } | ||
| wsc.copy(partitionSpec = newPartitionSpec, orderSpec = newOrderSpec) | ||
|
|
||
| // Extract Windowed AggregateExpression | ||
| case we @ WindowExpression( | ||
| AggregateExpression(function, mode, isDistinct), | ||
| spec: WindowSpecDefinition) => | ||
| val newChildren = function.children.map(extractExpr) | ||
| val newFunction = function.withNewChildren(newChildren).asInstanceOf[AggregateFunction] | ||
| val newAgg = AggregateExpression(newFunction, mode, isDistinct) | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| seenWindowAggregates += newAgg | ||
| WindowExpression(newAgg, spec) | ||
|
|
||
| // Extracts AggregateExpression. For example, for SUM(x) - Sum(y) OVER (...), | ||
| // we need to extract SUM(x). | ||
| case agg: AggregateExpression => | ||
| case agg: AggregateExpression if !seenWindowAggregates.contains(agg) => | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this work in this case?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It should, only the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you want this tested?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I actually couldn't find a case for this. I'll add one to the DataFrameWindowSuite.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| val withName = Alias(agg, s"_w${extractedExprBuffer.length}")() | ||
| extractedExprBuffer += withName | ||
| withName.toAttribute | ||
|
|
@@ -1097,6 +1118,42 @@ class Analyzer( | |
| } | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Check and add proper window frames for all window functions. | ||
| */ | ||
| object ResolveWindowFrame extends Rule[LogicalPlan] { | ||
| def apply(plan: LogicalPlan): LogicalPlan = plan transform { | ||
| case logical: LogicalPlan => logical transformExpressions { | ||
| case WindowExpression(wf: WindowFunction, | ||
| WindowSpecDefinition(_, _, f: SpecifiedWindowFrame)) | ||
| if wf.frame != UnspecifiedFrame && wf.frame != f => | ||
| failAnalysis(s"Window Frame $f must match the required frame ${wf.frame}") | ||
| case WindowExpression(wf: WindowFunction, | ||
| s @ WindowSpecDefinition(_, o, UnspecifiedFrame)) | ||
| if wf.frame != UnspecifiedFrame => | ||
| WindowExpression(wf, s.copy(frameSpecification = wf.frame)) | ||
| case we @ WindowExpression(e, s @ WindowSpecDefinition(_, o, UnspecifiedFrame)) => | ||
| val frame = SpecifiedWindowFrame.defaultWindowFrame(o.nonEmpty, acceptWindowFrame = true) | ||
| we.copy(windowSpec = s.copy(frameSpecification = frame)) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Check and add order to [[AggregateWindowFunction]]s. | ||
| */ | ||
| object ResolveWindowOrder extends Rule[LogicalPlan] { | ||
| def apply(plan: LogicalPlan): LogicalPlan = plan transform { | ||
| case logical: LogicalPlan => logical transformExpressions { | ||
| case WindowExpression(wf: WindowFunction, spec) if spec.orderSpec.isEmpty => | ||
| failAnalysis(s"WindowFunction $wf requires window to be ordered") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it required?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. All window functions (LEAD, LAG, ROW_NUMBER, RANK, DENSE_RANK, PERCENT_RANK, NTILE, CUME_DIST) imply ordering. Without ordering their results would be non-deterministic and not very usefull. Only regular Aggregates make sense to use in without ordering. |
||
| case WindowExpression(rank: RankLike, spec) if spec.resolved => | ||
| val order = spec.orderSpec.map(_.child) | ||
| WindowExpression(rank.withOrder(order), spec) | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /** | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -70,15 +70,32 @@ trait CheckAnalysis { | |
| failAnalysis( | ||
| s"invalid cast from ${c.child.dataType.simpleString} to ${c.dataType.simpleString}") | ||
|
|
||
| case WindowExpression(UnresolvedWindowFunction(name, _), _) => | ||
| failAnalysis( | ||
| s"Could not resolve window function '$name'. " + | ||
| "Note that, using window functions currently requires a HiveContext") | ||
| case w @ WindowExpression(AggregateExpression(_, _, true), _) => | ||
| failAnalysis(s"Distinct window functions are not supported: $w") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does Hive support that?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think it is... @yhuai?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hive's parser will silently drop the distinct keyword when an aggregate function is used as a window function. |
||
|
|
||
| case w @ WindowExpression(_: OffsetWindowFunction, WindowSpecDefinition(_, order, | ||
| SpecifiedWindowFrame(frame, | ||
| FrameBoundary(l), | ||
| FrameBoundary(h)))) | ||
| if order.isEmpty || frame != RowFrame || l != h => | ||
| failAnalysis("An offset window function can only be evaluated in an ordered " + | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we allow
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, an
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I meant
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nvm, they will not match, sorry. |
||
| s"row-based window frame with a single offset: $w") | ||
|
|
||
| case w @ WindowExpression(e, s) => | ||
| // Only allow window functions with an aggregate expression or an offset window | ||
| // function. | ||
| e match { | ||
| case _: AggregateExpression | _: OffsetWindowFunction | _: AggregateWindowFunction => | ||
| case _ => | ||
| failAnalysis(s"Expression '$e' not supported within a window function.") | ||
| } | ||
| // Make sure the window specification is valid. | ||
| s.validate match { | ||
| case Some(m) => | ||
| failAnalysis(s"Window specification $s is not valid because $m") | ||
| case None => w | ||
| } | ||
|
|
||
| case w @ WindowExpression(windowFunction, windowSpec) if windowSpec.validate.nonEmpty => | ||
| // The window spec is not valid. | ||
| val reason = windowSpec.validate.get | ||
| failAnalysis(s"Window specification $windowSpec is not valid because $reason") | ||
| } | ||
|
|
||
| operator match { | ||
|
|
@@ -204,10 +221,12 @@ trait CheckAnalysis { | |
| s"unresolved operator ${operator.simpleString}") | ||
|
|
||
| case o if o.expressions.exists(!_.deterministic) && | ||
| !o.isInstanceOf[Project] && !o.isInstanceOf[Filter] & !o.isInstanceOf[Aggregate] => | ||
| !o.isInstanceOf[Project] && !o.isInstanceOf[Filter] && | ||
| !o.isInstanceOf[Aggregate] && !o.isInstanceOf[Window] => | ||
| // The rule above is used to check Aggregate operator. | ||
| failAnalysis( | ||
| s"""nondeterministic expressions are only allowed in Project or Filter, found: | ||
| s"""nondeterministic expressions are only allowed in | ||
| |Project, Filter, Aggregate or Window, found: | ||
| | ${o.expressions.map(_.prettyString).mkString(",")} | ||
| |in operator ${operator.simpleString} | ||
| """.stripMargin) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.expressions.aggregate | |
|
|
||
| import org.apache.spark.sql.catalyst.analysis.TypeCheckResult | ||
| import org.apache.spark.sql.catalyst.expressions._ | ||
| import org.apache.spark.sql.catalyst.expressions.codegen.{GeneratedExpressionCode, CodeGenContext} | ||
| import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenFallback, GeneratedExpressionCode, CodeGenContext} | ||
| import org.apache.spark.sql.catalyst.InternalRow | ||
| import org.apache.spark.sql.types._ | ||
|
|
||
|
|
@@ -144,9 +144,6 @@ sealed abstract class AggregateFunction extends Expression with ImplicitCastInpu | |
| */ | ||
| def defaultResult: Option[Literal] = None | ||
|
|
||
| override protected def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = | ||
| throw new UnsupportedOperationException(s"Cannot evaluate expression: $this") | ||
|
|
||
| /** | ||
| * Wraps this [[AggregateFunction]] in an [[AggregateExpression]] because | ||
| * [[AggregateExpression]] is the container of an [[AggregateFunction]], aggregation mode, | ||
|
|
@@ -187,7 +184,7 @@ sealed abstract class AggregateFunction extends Expression with ImplicitCastInpu | |
| * `inputAggBufferOffset`, but not on the correctness of the attribute ids in `aggBufferAttributes` | ||
| * and `inputAggBufferAttributes`. | ||
| */ | ||
| abstract class ImperativeAggregate extends AggregateFunction { | ||
| abstract class ImperativeAggregate extends AggregateFunction with CodegenFallback { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a small trick to allow us to add the
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Recently I also tried this trick, but failed, because the eval() usually only use attributes in the buffer, but BoundReference will try to look attributes for child of AggregateFunction, which may not exists. Could you have a test case for it? (using AggregateFunction as window function)
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should be fine as long as we add already bound I use |
||
|
|
||
| /** | ||
| * The offset of this function's first buffer value in the underlying shared mutable aggregation | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could
collectFirstalso be replaced byexists?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no
existsmethod inTreeNodeorExpression.