Skip to content

Commit f76ee10

Browse files
hvanhovellyhuai
authored andcommitted
[SPARK-8641][SPARK-12455][SQL] Native Spark Window functions - Follow-up (docs & tests)
This PR is a follow-up for PR #9819. It adds documentation for the window functions and a couple of NULL tests. The documentation was largely based on the documentation in (the source of) Hive and Presto: * https://prestodb.io/docs/current/functions/window.html * https://cwiki.apache.org/confluence/display/Hive/LanguageManual+WindowingAndAnalytics I am not sure if we need to add the licenses of these two projects to the licenses directory. They are both under the ASL. srowen any thoughts? cc yhuai Author: Herman van Hovell <[email protected]> Closes #10402 from hvanhovell/SPARK-8641-docs.
1 parent b244297 commit f76ee10

File tree

3 files changed

+162
-3
lines changed

3 files changed

+162
-3
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala

Lines changed: 127 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -314,8 +314,8 @@ abstract class OffsetWindowFunction
314314
val offset: Expression
315315

316316
/**
317-
* Direction (above = 1/below = -1) of the number of rows between the current row and the row
318-
* where the input expression is evaluated.
317+
* Direction of the number of rows between the current row and the row where the input expression
318+
* is evaluated.
319319
*/
320320
val direction: SortDirection
321321

@@ -327,7 +327,7 @@ abstract class OffsetWindowFunction
327327
* both the input and the default expression are foldable, the result is still not foldable due to
328328
* the frame.
329329
*/
330-
override def foldable: Boolean = input.foldable && (default == null || default.foldable)
330+
override def foldable: Boolean = false
331331

332332
override def nullable: Boolean = default == null || default.nullable
333333

@@ -353,6 +353,21 @@ abstract class OffsetWindowFunction
353353
override def toString: String = s"$prettyName($input, $offset, $default)"
354354
}
355355

356+
/**
357+
* The Lead function returns the value of 'x' at 'offset' rows after the current row in the window.
358+
* Offsets start at 0, which is the current row. The offset must be constant integer value. The
359+
* default offset is 1. When the value of 'x' is null at the offset, or when the offset is larger
360+
* than the window, the default expression is evaluated.
361+
*
362+
* This documentation has been based upon similar documentation for the Hive and Presto projects.
363+
*
364+
* @param input expression to evaluate 'offset' rows after the current row.
365+
* @param offset rows to jump ahead in the partition.
366+
* @param default to use when the input value is null or when the offset is larger than the window.
367+
*/
368+
@ExpressionDescription(usage =
369+
"""_FUNC_(input, offset, default) - LEAD returns the value of 'x' at 'offset' rows after the
370+
current row in the window""")
356371
case class Lead(input: Expression, offset: Expression, default: Expression)
357372
extends OffsetWindowFunction {
358373

@@ -365,6 +380,21 @@ case class Lead(input: Expression, offset: Expression, default: Expression)
365380
override val direction = Ascending
366381
}
367382

383+
/**
384+
* The Lag function returns the value of 'x' at 'offset' rows before the current row in the window.
385+
* Offsets start at 0, which is the current row. The offset must be constant integer value. The
386+
* default offset is 1. When the value of 'x' is null at the offset, or when the offset is smaller
387+
* than the window, the default expression is evaluated.
388+
*
389+
* This documentation has been based upon similar documentation for the Hive and Presto projects.
390+
*
391+
* @param input expression to evaluate 'offset' rows before the current row.
392+
* @param offset rows to jump back in the partition.
393+
* @param default to use when the input value is null or when the offset is smaller than the window.
394+
*/
395+
@ExpressionDescription(usage =
396+
"""_FUNC_(input, offset, default) - LAG returns the value of 'x' at 'offset' rows before the
397+
current row in the window""")
368398
case class Lag(input: Expression, offset: Expression, default: Expression)
369399
extends OffsetWindowFunction {
370400

@@ -409,10 +439,31 @@ object SizeBasedWindowFunction {
409439
val n = AttributeReference("window__partition__size", IntegerType, nullable = false)()
410440
}
411441

442+
/**
443+
* The RowNumber function computes a unique, sequential number to each row, starting with one,
444+
* according to the ordering of rows within the window partition.
445+
*
446+
* This documentation has been based upon similar documentation for the Hive and Presto projects.
447+
*/
448+
@ExpressionDescription(usage =
449+
"""_FUNC_() - The ROW_NUMBER() function assigns a unique, sequential
450+
number to each row, starting with one, according to the ordering of rows within the window
451+
partition.""")
412452
case class RowNumber() extends RowNumberLike {
413453
override val evaluateExpression = rowNumber
414454
}
415455

456+
/**
457+
* The CumeDist function computes the position of a value relative to a all values in the partition.
458+
* The result is the number of rows preceding or equal to the current row in the ordering of the
459+
* partition divided by the total number of rows in the window partition. Any tie values in the
460+
* ordering will evaluate to the same position.
461+
*
462+
* This documentation has been based upon similar documentation for the Hive and Presto projects.
463+
*/
464+
@ExpressionDescription(usage =
465+
"""_FUNC_() - The CUME_DIST() function computes the position of a value relative to a all values
466+
in the partition.""")
416467
case class CumeDist() extends RowNumberLike with SizeBasedWindowFunction {
417468
override def dataType: DataType = DoubleType
418469
// The frame for CUME_DIST is Range based instead of Row based, because CUME_DIST must
@@ -421,6 +472,30 @@ case class CumeDist() extends RowNumberLike with SizeBasedWindowFunction {
421472
override val evaluateExpression = Divide(Cast(rowNumber, DoubleType), Cast(n, DoubleType))
422473
}
423474

475+
/**
476+
* The NTile function divides the rows for each window partition into 'n' buckets ranging from 1 to
477+
* at most 'n'. Bucket values will differ by at most 1. If the number of rows in the partition does
478+
* not divide evenly into the number of buckets, then the remainder values are distributed one per
479+
* bucket, starting with the first bucket.
480+
*
481+
* The NTile function is particularly useful for the calculation of tertiles, quartiles, deciles and
482+
* other common summary statistics
483+
*
484+
* The function calculates two variables during initialization: The size of a regular bucket, and
485+
* the number of buckets that will have one extra row added to it (when the rows do not evenly fit
486+
* into the number of buckets); both variables are based on the size of the current partition.
487+
* During the calculation process the function keeps track of the current row number, the current
488+
* bucket number, and the row number at which the bucket will change (bucketThreshold). When the
489+
* current row number reaches bucket threshold, the bucket value is increased by one and the the
490+
* threshold is increased by the bucket size (plus one extra if the current bucket is padded).
491+
*
492+
* This documentation has been based upon similar documentation for the Hive and Presto projects.
493+
*
494+
* @param buckets number of buckets to divide the rows in. Default value is 1.
495+
*/
496+
@ExpressionDescription(usage =
497+
"""_FUNC_(x) - The NTILE(n) function divides the rows for each window partition into 'n' buckets
498+
ranging from 1 to at most 'n'.""")
424499
case class NTile(buckets: Expression) extends RowNumberLike with SizeBasedWindowFunction {
425500
def this() = this(Literal(1))
426501

@@ -474,6 +549,8 @@ case class NTile(buckets: Expression) extends RowNumberLike with SizeBasedWindow
474549
* the order of the window in which is processed. For instance, when the value of 'x' changes in a
475550
* window ordered by 'x' the rank function also changes. The size of the change of the rank function
476551
* is (typically) not dependent on the size of the change in 'x'.
552+
*
553+
* This documentation has been based upon similar documentation for the Hive and Presto projects.
477554
*/
478555
abstract class RankLike extends AggregateWindowFunction {
479556
override def inputTypes: Seq[AbstractDataType] = children.map(_ => AnyDataType)
@@ -513,11 +590,41 @@ abstract class RankLike extends AggregateWindowFunction {
513590
def withOrder(order: Seq[Expression]): RankLike
514591
}
515592

593+
/**
594+
* The Rank function computes the rank of a value in a group of values. The result is one plus the
595+
* number of rows preceding or equal to the current row in the ordering of the partition. Tie values
596+
* will produce gaps in the sequence.
597+
*
598+
* This documentation has been based upon similar documentation for the Hive and Presto projects.
599+
*
600+
* @param children to base the rank on; a change in the value of one the children will trigger a
601+
* change in rank. This is an internal parameter and will be assigned by the
602+
* Analyser.
603+
*/
604+
@ExpressionDescription(usage =
605+
"""_FUNC_() - RANK() computes the rank of a value in a group of values. The result is one plus
606+
the number of rows preceding or equal to the current row in the ordering of the partition. Tie
607+
values will produce gaps in the sequence.""")
516608
case class Rank(children: Seq[Expression]) extends RankLike {
517609
def this() = this(Nil)
518610
override def withOrder(order: Seq[Expression]): Rank = Rank(order)
519611
}
520612

613+
/**
614+
* The DenseRank function computes the rank of a value in a group of values. The result is one plus
615+
* the previously assigned rank value. Unlike Rank, DenseRank will not produce gaps in the ranking
616+
* sequence.
617+
*
618+
* This documentation has been based upon similar documentation for the Hive and Presto projects.
619+
*
620+
* @param children to base the rank on; a change in the value of one the children will trigger a
621+
* change in rank. This is an internal parameter and will be assigned by the
622+
* Analyser.
623+
*/
624+
@ExpressionDescription(usage =
625+
"""_FUNC_() - The DENSE_RANK() function computes the rank of a value in a group of values. The
626+
result is one plus the previously assigned rank value. Unlike Rank, DenseRank will not produce
627+
gaps in the ranking sequence.""")
521628
case class DenseRank(children: Seq[Expression]) extends RankLike {
522629
def this() = this(Nil)
523630
override def withOrder(order: Seq[Expression]): DenseRank = DenseRank(order)
@@ -527,6 +634,23 @@ case class DenseRank(children: Seq[Expression]) extends RankLike {
527634
override val initialValues = zero +: orderInit
528635
}
529636

637+
/**
638+
* The PercentRank function computes the percentage ranking of a value in a group of values. The
639+
* result the rank of the minus one divided by the total number of rows in the partitiion minus one:
640+
* (r - 1) / (n - 1). If a partition only contains one row, the function will return 0.
641+
*
642+
* The PercentRank function is similar to the CumeDist function, but it uses rank values instead of
643+
* row counts in the its numerator.
644+
*
645+
* This documentation has been based upon similar documentation for the Hive and Presto projects.
646+
*
647+
* @param children to base the rank on; a change in the value of one the children will trigger a
648+
* change in rank. This is an internal parameter and will be assigned by the
649+
* Analyser.
650+
*/
651+
@ExpressionDescription(usage =
652+
"""_FUNC_() - PERCENT_RANK() The PercentRank function computes the percentage ranking of a value
653+
in a group of values.""")
530654
case class PercentRank(children: Seq[Expression]) extends RankLike with SizeBasedWindowFunction {
531655
def this() = this(Nil)
532656
override def withOrder(order: Seq[Expression]): PercentRank = PercentRank(order)

sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowSuite.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,4 +292,24 @@ class DataFrameWindowSuite extends QueryTest with SharedSQLContext {
292292
Row("b", 3, 8, 32),
293293
Row("b", 2, 4, 8)))
294294
}
295+
296+
test("null inputs") {
297+
val df = Seq(("a", 1), ("a", 1), ("a", 2), ("a", 2), ("b", 4), ("b", 3), ("b", 2))
298+
.toDF("key", "value")
299+
val window = Window.orderBy()
300+
checkAnswer(
301+
df.select(
302+
$"key",
303+
$"value",
304+
avg(lit(null)).over(window),
305+
sum(lit(null)).over(window)),
306+
Seq(
307+
Row("a", 1, null, null),
308+
Row("a", 1, null, null),
309+
Row("a", 2, null, null),
310+
Row("a", 2, null, null),
311+
Row("b", 4, null, null),
312+
Row("b", 3, null, null),
313+
Row("b", 2, null, null)))
314+
}
295315
}

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowQuerySuite.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,4 +227,19 @@ class WindowQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleto
227227
Row("Manufacturer#5", "almond azure blanched chiffon midnight", 23, 315.9225931564038, 315.9225931564038, 46, 99807.08486666666, -0.9978877469246935, -5664.856666666666)))
228228
// scalastyle:on
229229
}
230+
231+
test("null arguments") {
232+
checkAnswer(sql("""
233+
|select p_mfgr, p_name, p_size,
234+
|sum(null) over(distribute by p_mfgr sort by p_name) as sum,
235+
|avg(null) over(distribute by p_mfgr sort by p_name) as avg
236+
|from part
237+
""".stripMargin),
238+
sql("""
239+
|select p_mfgr, p_name, p_size,
240+
|null as sum,
241+
|null as avg
242+
|from part
243+
""".stripMargin))
244+
}
230245
}

0 commit comments

Comments
 (0)