Skip to content

Commit 4a1f9d1

Browse files
committed
Address comments
1 parent c153cf7 commit 4a1f9d1

File tree

2 files changed

+8
-3
lines changed

2 files changed

+8
-3
lines changed

flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/LagAggFunction.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,10 @@ public void accumulate(LagAcc<T> acc, T value) throws Exception {
100100
}
101101

102102
public void accumulate(LagAcc<T> acc, T value, int offset) throws Exception {
103+
if (offset < 0) {
104+
throw new TableException(String.format("Offset(%d) should be positive.", offset));
105+
}
106+
103107
acc.offset = offset;
104108
accumulate(acc, value);
105109
}

flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,10 @@ import scala.collection.JavaConversions._
4545
* as subclasses of [[SqlAggFunction]] in Calcite but not as [[BridgingSqlAggFunction]]. The factory
4646
* returns [[DeclarativeAggregateFunction]] or [[BuiltInAggregateFunction]].
4747
*
48-
* @param inputType the input rel data type
49-
* @param orderKeyIdx the indexes of order key (null when is not over agg)
50-
* @param needRetraction true if need retraction
48+
* @param inputRowType the input row type
49+
* @param orderKeyIndexes the indexes of order key (null when is not over agg)
50+
* @param aggCallNeedRetractions true if need retraction
51+
* @param isBounded true if the source is bounded source
5152
*/
5253
class AggFunctionFactory(
5354
inputRowType: RowType,

0 commit comments

Comments
 (0)