Skip to content

Commit d159ec7

Browse files
beyond1920zhaoxing
authored andcommitted
[hotfix][table-planner-blink] Fix bug for window join: plan is wrong if join condition contains 'IS NOT DISTINCT FROM'
Fix Flink-22098 caused by a mistake when rebasing This closes apache#15695
1 parent 95245e9 commit d159ec7

File tree

2 files changed

+26
-20
lines changed

2 files changed

+26
-20
lines changed

flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowJoinUtil.scala

Lines changed: 25 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -86,43 +86,49 @@ object WindowJoinUtil {
8686
windowEndEqualityRightKeys) =
8787
excludeWindowStartEqualityAndEndEqualityFromJoinInfoPairs(join)
8888

89-
val joinInfo = join.analyzeCondition()
89+
val joinSpec = JoinUtil.createJoinSpec(join)
9090
val (remainLeftKeys, remainRightKeys, remainCondition) = if (
9191
windowStartEqualityLeftKeys.nonEmpty || windowEndEqualityLeftKeys.nonEmpty) {
9292
val leftChildFieldsType = join.getLeft.getRowType.getFieldList
9393
val rightChildFieldsType = join.getRight.getRowType.getFieldList
9494
val leftFieldCnt = join.getLeft.getRowType.getFieldCount
9595
val rexBuilder = join.getCluster.getRexBuilder
96-
val remainEquals = mutable.ArrayBuffer[RexNode]()
96+
val remainingConditions = mutable.ArrayBuffer[RexNode]()
9797
val remainLeftKeysArray = mutable.ArrayBuffer[Int]()
9898
val remainRightKeysArray = mutable.ArrayBuffer[Int]()
9999
// convert remain pairs to RexInputRef tuple for building SqlStdOperatorTable.EQUALS calls
100100
// or SqlStdOperatorTable.IS_NOT_DISTINCT_FROM
101-
joinInfo.pairs().foreach { p =>
102-
if (!windowStartEqualityLeftKeys.contains(p.source) &&
103-
!windowEndEqualityLeftKeys.contains(p.source)) {
104-
val leftFieldType = leftChildFieldsType.get(p.source).getType
105-
val leftInputRef = new RexInputRef(p.source, leftFieldType)
106-
val rightFieldType = rightChildFieldsType.get(p.target).getType
107-
val rightIndex = leftFieldCnt + p.target
101+
joinSpec.getLeftKeys.zip(joinSpec.getRightKeys).
102+
zip(joinSpec.getFilterNulls).foreach { case ((source, target), filterNull) =>
103+
if (!windowStartEqualityLeftKeys.contains(source) &&
104+
!windowEndEqualityLeftKeys.contains(source)) {
105+
val leftFieldType = leftChildFieldsType.get(source).getType
106+
val leftInputRef = new RexInputRef(source, leftFieldType)
107+
val rightFieldType = rightChildFieldsType.get(target).getType
108+
val rightIndex = leftFieldCnt + target
108109
val rightInputRef = new RexInputRef(rightIndex, rightFieldType)
109-
val remainEqual = rexBuilder.makeCall(
110-
SqlStdOperatorTable.EQUALS,
111-
leftInputRef,
112-
rightInputRef)
113-
remainEquals.add(remainEqual)
114-
remainLeftKeysArray.add(p.source)
115-
remainRightKeysArray.add(p.target)
110+
val op = if (filterNull) {
111+
SqlStdOperatorTable.EQUALS
112+
} else {
113+
SqlStdOperatorTable.IS_NOT_DISTINCT_FROM
114+
}
115+
val remainEqual = rexBuilder.makeCall(op, leftInputRef, rightInputRef)
116+
remainingConditions += remainEqual
117+
remainLeftKeysArray += source
118+
remainRightKeysArray += target
116119
}
117120
}
118-
val remainAnds = remainEquals ++ joinInfo.nonEquiConditions
121+
val notEquiCondition = joinSpec.getNonEquiCondition
122+
if (notEquiCondition.isPresent) {
123+
remainingConditions += notEquiCondition.get()
124+
}
119125
(
120126
remainLeftKeysArray.toArray,
121127
remainRightKeysArray.toArray,
122128
// build a new condition
123-
RexUtil.composeConjunction(rexBuilder, remainAnds.toList))
129+
RexUtil.composeConjunction(rexBuilder, remainingConditions.toList))
124130
} else {
125-
(joinInfo.leftKeys.toIntArray, joinInfo.rightKeys.toIntArray, join.getCondition)
131+
(joinSpec.getLeftKeys, joinSpec.getRightKeys, join.getCondition)
126132
}
127133

128134
(

flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -537,7 +537,7 @@ LogicalProject(a=[$0], window_start=[$1], window_end=[$2], window_time=[$3], cnt
537537
</Resource>
538538
<Resource name="optimized rel plan">
539539
<![CDATA[
540-
WindowJoin(leftWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[15 min])], rightWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[15 min])], joinType=[InnerJoin], where=[=(a, a0)], select=[a, window_start, window_end, window_time, cnt, uv, a0, window_start0, window_end0, window_time0, cnt0, uv0])
540+
WindowJoin(leftWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[15 min])], rightWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[15 min])], joinType=[InnerJoin], where=[IS NOT DISTINCT FROM(a, a0)], select=[a, window_start, window_end, window_time, cnt, uv, a0, window_start0, window_end0, window_time0, cnt0, uv0])
541541
:- Exchange(distribution=[hash[a]])
542542
: +- Calc(select=[a, window_start, window_end, window_time, cnt, uv])
543543
: +- GlobalWindowAggregate(groupBy=[a], window=[TUMBLE(slice_end=[$slice_end], size=[15 min])], select=[a, COUNT(count1$0) AS cnt, COUNT(distinct$0 count$1) AS uv, start('w$) AS window_start, end('w$) AS window_end, rowtime('w$) AS window_time])

0 commit comments

Comments
 (0)