Skip to content

Commit df10658

Browse files
hvanhovellyhuai
authored andcommitted
[SPARK-16749][SQL] Simplify processing logic in LEAD/LAG processing.
## What changes were proposed in this pull request? The logic for LEAD/LAG processing is more complex that it needs to be. This PR fixes that. ## How was this patch tested? Existing tests. Author: Herman van Hovell <[email protected]> Closes #14376 from hvanhovell/SPARK-16749.
1 parent 53d1c78 commit df10658

File tree

1 file changed

+18
-35
lines changed

1 file changed

+18
-35
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/WindowExec.scala

Lines changed: 18 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,8 @@ case class WindowExec(
209209
new OffsetWindowFunctionFrame(
210210
target,
211211
ordinal,
212-
functions,
212+
// OFFSET frame functions are guaranteed be OffsetWindowFunctions.
213+
functions.map(_.asInstanceOf[OffsetWindowFunction]),
213214
child.output,
214215
(expressions, schema) =>
215216
newMutableProjection(expressions, schema, subexpressionEliminationEnabled),
@@ -557,6 +558,9 @@ private[execution] abstract class WindowFunctionFrame {
557558
* The offset window frame calculates frames containing LEAD/LAG statements.
558559
*
559560
* @param target to write results to.
561+
* @param ordinal the ordinal is the starting offset at which the results of the window frame get
562+
* written into the (shared) target row. The result of the frame expression with
563+
* index 'i' will be written to the 'ordinal' + 'i' position in the target row.
560564
* @param expressions to shift a number of rows.
561565
* @param inputSchema required for creating a projection.
562566
* @param newMutableProjection function used to create the projection.
@@ -565,7 +569,7 @@ private[execution] abstract class WindowFunctionFrame {
565569
private[execution] final class OffsetWindowFunctionFrame(
566570
target: MutableRow,
567571
ordinal: Int,
568-
expressions: Array[Expression],
572+
expressions: Array[OffsetWindowFunction],
569573
inputSchema: Seq[Attribute],
570574
newMutableProjection: (Seq[Expression], Seq[Attribute]) => MutableProjection,
571575
offset: Int) extends WindowFunctionFrame {
@@ -576,25 +580,15 @@ private[execution] final class OffsetWindowFunctionFrame(
576580
/** Index of the input row currently used for output. */
577581
private[this] var inputIndex = 0
578582

579-
/** Row used when there is no valid input. */
580-
private[this] val emptyRow = new GenericInternalRow(inputSchema.size)
581-
582-
/** Row used to combine the offset and the current row. */
583-
private[this] val join = new JoinedRow
584-
585583
/**
586584
* Create the projection used when the offset row exists.
587585
* Please note that this project always respect null input values (like PostgreSQL).
588586
*/
589587
private[this] val projection = {
590588
// Collect the expressions and bind them.
591589
val inputAttrs = inputSchema.map(_.withNullability(true))
592-
val boundExpressions = Seq.fill(ordinal)(NoOp) ++ expressions.toSeq.map {
593-
case e: OffsetWindowFunction =>
594-
val input = BindReferences.bindReference(e.input, inputAttrs)
595-
input
596-
case e =>
597-
BindReferences.bindReference(e, inputAttrs)
590+
val boundExpressions = Seq.fill(ordinal)(NoOp) ++ expressions.toSeq.map { e =>
591+
BindReferences.bindReference(e.input, inputAttrs)
598592
}
599593

600594
// Create the projection.
@@ -605,23 +599,14 @@ private[execution] final class OffsetWindowFunctionFrame(
605599
private[this] val fillDefaultValue = {
606600
// Collect the expressions and bind them.
607601
val inputAttrs = inputSchema.map(_.withNullability(true))
608-
val numInputAttributes = inputAttrs.size
609-
val boundExpressions = Seq.fill(ordinal)(NoOp) ++ expressions.toSeq.map {
610-
case e: OffsetWindowFunction =>
611-
if (e.default == null || e.default.foldable && e.default.eval() == null) {
612-
// The default value is null.
613-
Literal.create(null, e.dataType)
614-
} else {
615-
// The default value is an expression.
616-
val default = BindReferences.bindReference(e.default, inputAttrs).transform {
617-
// Shift the input reference to its default version.
618-
case BoundReference(o, dataType, nullable) =>
619-
BoundReference(o + numInputAttributes, dataType, nullable)
620-
}
621-
default
622-
}
623-
case e =>
624-
BindReferences.bindReference(e, inputAttrs)
602+
val boundExpressions = Seq.fill(ordinal)(NoOp) ++ expressions.toSeq.map { e =>
603+
if (e.default == null || e.default.foldable && e.default.eval() == null) {
604+
// The default value is null.
605+
Literal.create(null, e.dataType)
606+
} else {
607+
// The default value is an expression.
608+
BindReferences.bindReference(e.default, inputAttrs)
609+
}
625610
}
626611

627612
// Create the projection.
@@ -642,12 +627,10 @@ private[execution] final class OffsetWindowFunctionFrame(
642627
override def write(index: Int, current: InternalRow): Unit = {
643628
if (inputIndex >= 0 && inputIndex < input.size) {
644629
val r = input.next()
645-
join(r, current)
646-
projection(join)
630+
projection(r)
647631
} else {
648-
join(emptyRow, current)
649632
// Use default values since the offset row does not exist.
650-
fillDefaultValue(join)
633+
fillDefaultValue(current)
651634
}
652635
inputIndex += 1
653636
}

0 commit comments

Comments
 (0)