Skip to content

Commit e813ef5

Browse files
committed
Updates based on JingsongLi'comment
1 parent c1b50a8 commit e813ef5

File tree

4 files changed

+12
-32
lines changed

4 files changed

+12
-32
lines changed

flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/JoinConditionWithNullFilters.java

Lines changed: 10 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,16 @@
1818

1919
package org.apache.flink.table.runtime.operators.join;
2020

21-
import org.apache.flink.api.common.functions.AbstractRichFunction;
22-
import org.apache.flink.configuration.Configuration;
21+
import org.apache.flink.api.java.operators.translation.WrappingFunction;
22+
import org.apache.flink.streaming.api.operators.KeyContext;
2323
import org.apache.flink.table.data.RowData;
2424
import org.apache.flink.table.data.binary.NullAwareGetters;
2525
import org.apache.flink.table.runtime.generated.JoinCondition;
2626

2727
/** Utility to take null filters into consideration when apply join condition. */
28-
public class JoinConditionWithNullFilters extends AbstractRichFunction implements JoinCondition {
28+
public class JoinConditionWithNullFilters extends WrappingFunction<JoinCondition> implements JoinCondition {
2929

30-
private final JoinCondition backingJoinCondition;
30+
private static final long serialVersionUID = 1L;
3131

3232
/** Should filter null keys. */
3333
private final int[] nullFilterKeys;
@@ -38,42 +38,28 @@ public class JoinConditionWithNullFilters extends AbstractRichFunction implement
3838
/** Filter null to all keys. */
3939
private final boolean filterAllNulls;
4040

41-
private NullAwareGetters joinKey;
41+
private KeyContext keyContext;
4242

4343
public JoinConditionWithNullFilters(
44-
JoinCondition backingJoinCondition, boolean[] filterNullKeys) {
45-
this.backingJoinCondition = backingJoinCondition;
44+
JoinCondition backingJoinCondition, boolean[] filterNullKeys, KeyContext keyContext) {
45+
super(backingJoinCondition);
4646
this.nullFilterKeys = NullAwareJoinHelper.getNullFilterKeys(filterNullKeys);
4747
this.nullSafe = nullFilterKeys.length == 0;
4848
this.filterAllNulls = nullFilterKeys.length == filterNullKeys.length;
49-
}
50-
51-
@Override
52-
public void open(Configuration parameters) throws Exception {
53-
super.open(parameters);
54-
backingJoinCondition.open(parameters);
55-
}
56-
57-
@Override
58-
public void close() throws Exception {
59-
super.close();
60-
backingJoinCondition.close();
49+
this.keyContext = keyContext;
6150
}
6251

6352
@Override
6453
public boolean apply(RowData left, RowData right) {
6554
if (!nullSafe) { // is not null safe, return false if any null exists
6655
// key is always BinaryRowData
56+
NullAwareGetters joinKey = (NullAwareGetters) keyContext.getCurrentKey();
6757
if (filterAllNulls ? joinKey.anyNull() : joinKey.anyNull(nullFilterKeys)) {
6858
// find null present, return false directly
6959
return false;
7060
}
7161
}
7262
// test condition
73-
return backingJoinCondition.apply(left, right);
74-
}
75-
76-
public void setCurrentJoinKey(NullAwareGetters joinKey) {
77-
this.joinKey = joinKey;
63+
return wrappedFunction.apply(left, right);
7864
}
7965
}

flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/AbstractStreamingJoinOperator.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,11 +85,10 @@ public AbstractStreamingJoinOperator(
8585
@Override
8686
public void open() throws Exception {
8787
super.open();
88-
8988
JoinCondition condition =
9089
generatedJoinCondition.newInstance(getRuntimeContext().getUserCodeClassLoader());
91-
condition.setRuntimeContext(getRuntimeContext());
92-
this.joinCondition = new JoinConditionWithNullFilters(condition, filterNullKeys);
90+
this.joinCondition = new JoinConditionWithNullFilters(condition, filterNullKeys, this);
91+
this.joinCondition.setRuntimeContext(getRuntimeContext());
9392
this.joinCondition.open(new Configuration());
9493

9594
this.collector = new TimestampedCollector<>(output);

flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperator.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
2222
import org.apache.flink.table.data.GenericRowData;
2323
import org.apache.flink.table.data.RowData;
24-
import org.apache.flink.table.data.binary.NullAwareGetters;
2524
import org.apache.flink.table.data.util.RowDataUtil;
2625
import org.apache.flink.table.data.utils.JoinedRowData;
2726
import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
@@ -208,7 +207,6 @@ private void processElement(
208207
RowKind inputRowKind = input.getRowKind();
209208
input.setRowKind(RowKind.INSERT); // erase RowKind for later state updating
210209

211-
joinCondition.setCurrentJoinKey((NullAwareGetters) getCurrentKey());
212210
AssociatedRecords associatedRecords =
213211
AssociatedRecords.of(input, inputIsLeft, otherSideStateView, joinCondition);
214212
if (isAccumulateMsg) { // record is accumulate

flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingSemiAntiJoinOperator.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
2222
import org.apache.flink.table.data.RowData;
23-
import org.apache.flink.table.data.binary.NullAwareGetters;
2423
import org.apache.flink.table.data.util.RowDataUtil;
2524
import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
2625
import org.apache.flink.table.runtime.operators.join.stream.state.JoinInputSideSpec;
@@ -103,7 +102,6 @@ public void open() throws Exception {
103102
@Override
104103
public void processElement1(StreamRecord<RowData> element) throws Exception {
105104
RowData input = element.getValue();
106-
joinCondition.setCurrentJoinKey((NullAwareGetters) getCurrentKey());
107105
AssociatedRecords associatedRecords =
108106
AssociatedRecords.of(input, true, rightRecordStateView, joinCondition);
109107
if (associatedRecords.isEmpty()) {
@@ -168,7 +166,6 @@ public void processElement2(StreamRecord<RowData> element) throws Exception {
168166
RowKind inputRowKind = input.getRowKind();
169167
input.setRowKind(RowKind.INSERT); // erase RowKind for later state updating
170168

171-
joinCondition.setCurrentJoinKey((NullAwareGetters) getCurrentKey());
172169
AssociatedRecords associatedRecords =
173170
AssociatedRecords.of(input, false, leftRecordStateView, joinCondition);
174171
if (isAccumulateMsg) { // record is accumulate

0 commit comments

Comments
 (0)