Skip to content

Commit f511680

Browse files
beyond1920godfreyhe
authored andcommitted
[FLINK-22099][table-planner-blink] Fix bug about throwing ArrayIndexOutOfBoundsException when window join deals with semi/anti queries
This closes #15477
1 parent 6dd276e commit f511680

File tree

4 files changed

+546
-119
lines changed

4 files changed

+546
-119
lines changed

flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalWindowJoin.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ class StreamPhysicalWindowJoin(
9797
.item("joinType", joinSpec.getJoinType)
9898
.item("where",
9999
getExpressionString(
100-
remainingCondition, getRowType.getFieldNames.toList, None, preferExpressionFormat(pw)))
100+
remainingCondition, inputRowType.getFieldNames.toList, None, preferExpressionFormat(pw)))
101101
.item("select", getRowType.getFieldNames.mkString(", "))
102102
}
103103

flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowJoinUtil.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,8 @@ object WindowJoinUtil {
9696
val joinInfo = join.analyzeCondition()
9797
val (remainLeftKeys, remainRightKeys, remainCondition) = if (
9898
windowStartEqualityLeftKeys.nonEmpty || windowEndEqualityLeftKeys.nonEmpty) {
99-
val joinFieldsType = join.getRowType.getFieldList
99+
val leftChildFieldsType = join.getLeft.getRowType.getFieldList
100+
val rightChildFieldsType = join.getRight.getRowType.getFieldList
100101
val leftFieldCnt = join.getLeft.getRowType.getFieldCount
101102
val rexBuilder = join.getCluster.getRexBuilder
102103
val remainEquals = mutable.ArrayBuffer[RexNode]()
@@ -106,10 +107,10 @@ object WindowJoinUtil {
106107
joinInfo.pairs().foreach { p =>
107108
if (!windowStartEqualityLeftKeys.contains(p.source) &&
108109
!windowEndEqualityLeftKeys.contains(p.source)) {
109-
val leftFieldType = joinFieldsType.get(p.source).getType
110+
val leftFieldType = leftChildFieldsType.get(p.source).getType
110111
val leftInputRef = new RexInputRef(p.source, leftFieldType)
112+
val rightFieldType = rightChildFieldsType.get(p.target).getType
111113
val rightIndex = leftFieldCnt + p.target
112-
val rightFieldType = joinFieldsType.get(rightIndex).getType
113114
val rightInputRef = new RexInputRef(rightIndex, rightFieldType)
114115
val remainEqual = rexBuilder.makeCall(
115116
SqlStdOperatorTable.EQUALS,

0 commit comments

Comments
 (0)