Skip to content

Commit 427c179

Browse files
committed
Simplify processing logic in offset frame processing.
1 parent 5b8e848 commit 427c179

File tree

1 file changed

+14
-35
lines changed

1 file changed

+14
-35
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/WindowExec.scala

Lines changed: 14 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ case class WindowExec(
209209
new OffsetWindowFunctionFrame(
210210
target,
211211
ordinal,
212-
functions,
212+
functions.map(_.asInstanceOf[OffsetWindowFunction]),
213213
child.output,
214214
(expressions, schema) =>
215215
newMutableProjection(expressions, schema, subexpressionEliminationEnabled),
@@ -565,7 +565,7 @@ private[execution] abstract class WindowFunctionFrame {
565565
private[execution] final class OffsetWindowFunctionFrame(
566566
target: MutableRow,
567567
ordinal: Int,
568-
expressions: Array[Expression],
568+
expressions: Array[OffsetWindowFunction],
569569
inputSchema: Seq[Attribute],
570570
newMutableProjection: (Seq[Expression], Seq[Attribute]) => MutableProjection,
571571
offset: Int) extends WindowFunctionFrame {
@@ -576,25 +576,15 @@ private[execution] final class OffsetWindowFunctionFrame(
576576
/** Index of the input row currently used for output. */
577577
private[this] var inputIndex = 0
578578

579-
/** Row used when there is no valid input. */
580-
private[this] val emptyRow = new GenericInternalRow(inputSchema.size)
581-
582-
/** Row used to combine the offset and the current row. */
583-
private[this] val join = new JoinedRow
584-
585579
/**
586580
* Create the projection used when the offset row exists.
587581
* Please note that this project always respect null input values (like PostgreSQL).
588582
*/
589583
private[this] val projection = {
590584
// Collect the expressions and bind them.
591585
val inputAttrs = inputSchema.map(_.withNullability(true))
592-
val boundExpressions = Seq.fill(ordinal)(NoOp) ++ expressions.toSeq.map {
593-
case e: OffsetWindowFunction =>
594-
val input = BindReferences.bindReference(e.input, inputAttrs)
595-
input
596-
case e =>
597-
BindReferences.bindReference(e, inputAttrs)
586+
val boundExpressions = Seq.fill(ordinal)(NoOp) ++ expressions.toSeq.map { e =>
587+
BindReferences.bindReference(e.input, inputAttrs)
598588
}
599589

600590
// Create the projection.
@@ -605,23 +595,14 @@ private[execution] final class OffsetWindowFunctionFrame(
605595
private[this] val fillDefaultValue = {
606596
// Collect the expressions and bind them.
607597
val inputAttrs = inputSchema.map(_.withNullability(true))
608-
val numInputAttributes = inputAttrs.size
609-
val boundExpressions = Seq.fill(ordinal)(NoOp) ++ expressions.toSeq.map {
610-
case e: OffsetWindowFunction =>
611-
if (e.default == null || e.default.foldable && e.default.eval() == null) {
612-
// The default value is null.
613-
Literal.create(null, e.dataType)
614-
} else {
615-
// The default value is an expression.
616-
val default = BindReferences.bindReference(e.default, inputAttrs).transform {
617-
// Shift the input reference to its default version.
618-
case BoundReference(o, dataType, nullable) =>
619-
BoundReference(o + numInputAttributes, dataType, nullable)
620-
}
621-
default
622-
}
623-
case e =>
624-
BindReferences.bindReference(e, inputAttrs)
598+
val boundExpressions = Seq.fill(ordinal)(NoOp) ++ expressions.toSeq.map { e =>
599+
if (e.default == null || e.default.foldable && e.default.eval() == null) {
600+
// The default value is null.
601+
Literal.create(null, e.dataType)
602+
} else {
603+
// The default value is an expression.
604+
BindReferences.bindReference(e.default, inputAttrs)
605+
}
625606
}
626607

627608
// Create the projection.
@@ -642,12 +623,10 @@ private[execution] final class OffsetWindowFunctionFrame(
642623
override def write(index: Int, current: InternalRow): Unit = {
643624
if (inputIndex >= 0 && inputIndex < input.size) {
644625
val r = input.next()
645-
join(r, current)
646-
projection(join)
626+
projection(r)
647627
} else {
648-
join(emptyRow, current)
649628
// Use default values since the offset row does not exist.
650-
fillDefaultValue(join)
629+
fillDefaultValue(current)
651630
}
652631
inputIndex += 1
653632
}

0 commit comments

Comments
 (0)