Skip to content

Commit 11130e7

Browse files
author
shuo.cs
committed
make BytesMap#getEntryIterator copy-aware
1 parent ff29853 commit 11130e7

File tree

11 files changed

+50
-125
lines changed

11 files changed

+50
-125
lines changed

flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorBuilder.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -138,13 +138,9 @@ public SlicingWindowAggOperatorBuilder countStarIndex(int indexOfCountStart) {
138138
if (isGlobalAgg) {
139139
combinerFactory =
140140
new GlobalAggCombiner.Factory(
141-
localGeneratedAggregateFunction,
142-
globalGeneratedAggregateFunction,
143-
keySerializer);
141+
localGeneratedAggregateFunction, globalGeneratedAggregateFunction);
144142
} else {
145-
combinerFactory =
146-
new AggCombiner.Factory(
147-
generatedAggregateFunction, keySerializer, inputSerializer);
143+
combinerFactory = new AggCombiner.Factory(generatedAggregateFunction);
148144
}
149145
final WindowBuffer.Factory bufferFactory =
150146
new RecordsWindowBuffer.Factory(keySerializer, inputSerializer, combinerFactory);

flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/RecordsWindowBuffer.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import java.time.ZoneId;
3939
import java.util.Iterator;
4040

41+
import static org.apache.flink.table.runtime.util.StateConfigUtil.isStateImmutableInStateBackend;
4142
import static org.apache.flink.table.runtime.util.TimeWindowUtil.isWindowFired;
4243

4344
/**
@@ -51,6 +52,9 @@ public final class RecordsWindowBuffer implements WindowBuffer {
5152
private final WindowKey reuseWindowKey;
5253
private final AbstractRowDataSerializer<RowData> recordSerializer;
5354
private final ZoneId shiftTimeZone;
55+
// copy key and input record if necessary(e.g., heap state backend),
56+
// because key and record are reused.
57+
private final boolean requiresCopy;
5458

5559
private long minSliceEnd = Long.MAX_VALUE;
5660

@@ -61,13 +65,15 @@ public RecordsWindowBuffer(
6165
RecordsCombiner combineFunction,
6266
PagedTypeSerializer<RowData> keySer,
6367
AbstractRowDataSerializer<RowData> inputSer,
68+
boolean requiresCopy,
6469
ZoneId shiftTimeZone) {
6570
this.combineFunction = combineFunction;
6671
this.recordsBuffer =
6772
new WindowBytesMultiMap(
6873
operatorOwner, memoryManager, memorySize, keySer, inputSer.getArity());
6974
this.recordSerializer = inputSer;
7075
this.reuseWindowKey = new WindowKeySerializer(keySer).createInstance();
76+
this.requiresCopy = requiresCopy;
7177
this.shiftTimeZone = shiftTimeZone;
7278
}
7379

@@ -102,7 +108,7 @@ public void advanceProgress(long progress) throws Exception {
102108
public void flush() throws Exception {
103109
if (recordsBuffer.getNumKeys() > 0) {
104110
KeyValueIterator<WindowKey, Iterator<RowData>> entryIterator =
105-
recordsBuffer.getEntryIterator();
111+
recordsBuffer.getEntryIterator(requiresCopy);
106112
while (entryIterator.advanceNext()) {
107113
combineFunction.combine(entryIterator.getKey(), entryIterator.getValue());
108114
}
@@ -155,13 +161,15 @@ public WindowBuffer create(
155161
RecordsCombiner combiner =
156162
factory.createRecordsCombiner(
157163
runtimeContext, timerService, stateBackend, windowState, isEventTime);
164+
boolean requiresCopy = !isStateImmutableInStateBackend(stateBackend);
158165
return new RecordsWindowBuffer(
159166
operatorOwner,
160167
memoryManager,
161168
memorySize,
162169
combiner,
163170
keySer,
164171
inputSer,
172+
requiresCopy,
165173
shiftTimeZone);
166174
}
167175
}
@@ -202,6 +210,7 @@ public WindowBuffer create(
202210
combiner,
203211
keySer,
204212
inputSer,
213+
false,
205214
shiftTimeZone);
206215
}
207216
}

flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/AggCombiner.java

Lines changed: 2 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
package org.apache.flink.table.runtime.operators.aggregate.window.combines;
2020

2121
import org.apache.flink.api.common.functions.RuntimeContext;
22-
import org.apache.flink.api.common.typeutils.TypeSerializer;
2322
import org.apache.flink.api.common.typeutils.base.LongSerializer;
2423
import org.apache.flink.runtime.state.KeyedStateBackend;
2524
import org.apache.flink.table.data.RowData;
@@ -37,7 +36,6 @@
3736
import java.util.Iterator;
3837

3938
import static org.apache.flink.table.data.util.RowDataUtil.isAccumulateMsg;
40-
import static org.apache.flink.table.runtime.util.StateConfigUtil.isStateImmutableInStateBackend;
4139
import static org.apache.flink.table.runtime.util.TimeWindowUtil.isWindowFired;
4240

4341
/**
@@ -58,15 +56,6 @@ public class AggCombiner implements RecordsCombiner {
5856
/** Function used to handle all aggregates. */
5957
private final NamespaceAggsHandleFunction<Long> aggregator;
6058

61-
/** Whether to copy key and input record, because key and record are reused. */
62-
private final boolean requiresCopy;
63-
64-
/** Serializer to copy key if required. */
65-
private final TypeSerializer<RowData> keySerializer;
66-
67-
/** Serializer to copy record if required. */
68-
private final TypeSerializer<RowData> recordSerializer;
69-
7059
/** Whether the operator works in event-time mode, used to indicate registering which timer. */
7160
private final boolean isEventTime;
7261

@@ -75,31 +64,18 @@ public AggCombiner(
7564
StateKeyContext keyContext,
7665
WindowValueState<Long> accState,
7766
NamespaceAggsHandleFunction<Long> aggregator,
78-
boolean requiresCopy,
79-
TypeSerializer<RowData> keySerializer,
80-
TypeSerializer<RowData> valueSerializer,
8167
boolean isEventTime) {
8268
this.timerService = timerService;
8369
this.keyContext = keyContext;
8470
this.accState = accState;
8571
this.aggregator = aggregator;
86-
this.requiresCopy = requiresCopy;
87-
this.keySerializer = keySerializer;
88-
this.recordSerializer = valueSerializer;
8972
this.isEventTime = isEventTime;
9073
}
9174

9275
@Override
9376
public void combine(WindowKey windowKey, Iterator<RowData> records) throws Exception {
9477
// step 0: set current key for states and timers
95-
final RowData key;
96-
if (requiresCopy) {
97-
// the incoming key is reused, we should copy it if state backend doesn't copy it
98-
key = keySerializer.copy(windowKey.getKey());
99-
} else {
100-
key = windowKey.getKey();
101-
}
102-
keyContext.setCurrentKey(key);
78+
keyContext.setCurrentKey(windowKey.getKey());
10379

10480
// step 1: get the accumulator for the current key and window
10581
Long window = windowKey.getWindow();
@@ -114,10 +90,6 @@ public void combine(WindowKey windowKey, Iterator<RowData> records) throws Excep
11490
// step 3: do accumulate
11591
while (records.hasNext()) {
11692
RowData record = records.next();
117-
if (requiresCopy) {
118-
// the incoming record is reused, we should copy it if state backend doesn't copy it
119-
record = recordSerializer.copy(record);
120-
}
12193
if (isAccumulateMsg(record)) {
12294
aggregator.accumulate(record);
12395
} else {
@@ -156,16 +128,9 @@ public static final class Factory implements RecordsCombiner.Factory {
156128
private static final long serialVersionUID = 1L;
157129

158130
private final GeneratedNamespaceAggsHandleFunction<Long> genAggsHandler;
159-
private final TypeSerializer<RowData> keySerializer;
160-
private final TypeSerializer<RowData> valueSerializer;
161131

162-
public Factory(
163-
GeneratedNamespaceAggsHandleFunction<Long> genAggsHandler,
164-
TypeSerializer<RowData> keySerializer,
165-
TypeSerializer<RowData> valueSerializer) {
132+
public Factory(GeneratedNamespaceAggsHandleFunction<Long> genAggsHandler) {
166133
this.genAggsHandler = genAggsHandler;
167-
this.keySerializer = keySerializer;
168-
this.valueSerializer = valueSerializer;
169134
}
170135

171136
@Override
@@ -181,16 +146,12 @@ public RecordsCombiner createRecordsCombiner(
181146
aggregator.open(
182147
new PerWindowStateDataViewStore(
183148
stateBackend, LongSerializer.INSTANCE, runtimeContext));
184-
boolean requiresCopy = !isStateImmutableInStateBackend(stateBackend);
185149
WindowValueState<Long> windowValueState = (WindowValueState<Long>) windowState;
186150
return new AggCombiner(
187151
timerService,
188152
stateBackend::setCurrentKey,
189153
windowValueState,
190154
aggregator,
191-
requiresCopy,
192-
keySerializer,
193-
valueSerializer,
194155
isEventTime);
195156
}
196157
}

flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/GlobalAggCombiner.java

Lines changed: 4 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
package org.apache.flink.table.runtime.operators.aggregate.window.combines;
2020

2121
import org.apache.flink.api.common.functions.RuntimeContext;
22-
import org.apache.flink.api.common.typeutils.TypeSerializer;
2322
import org.apache.flink.api.common.typeutils.base.LongSerializer;
2423
import org.apache.flink.runtime.state.KeyedStateBackend;
2524
import org.apache.flink.table.data.RowData;
@@ -36,7 +35,6 @@
3635
import java.time.ZoneId;
3736
import java.util.Iterator;
3837

39-
import static org.apache.flink.table.runtime.util.StateConfigUtil.isStateImmutableInStateBackend;
4038
import static org.apache.flink.table.runtime.util.TimeWindowUtil.isWindowFired;
4139

4240
/**
@@ -62,27 +60,17 @@ public class GlobalAggCombiner implements RecordsCombiner {
6260
/** Global aggregate function to handle global accumulator rows. */
6361
private final NamespaceAggsHandleFunction<Long> globalAggregator;
6462

65-
/** Whether to copy key and input record, because key and record are reused. */
66-
private final boolean requiresCopy;
67-
68-
/** Serializer to copy key if required. */
69-
private final TypeSerializer<RowData> keySerializer;
70-
7163
public GlobalAggCombiner(
7264
WindowTimerService<Long> timerService,
7365
StateKeyContext keyContext,
7466
WindowValueState<Long> accState,
7567
NamespaceAggsHandleFunction<Long> localAggregator,
76-
NamespaceAggsHandleFunction<Long> globalAggregator,
77-
boolean requiresCopy,
78-
TypeSerializer<RowData> keySerializer) {
68+
NamespaceAggsHandleFunction<Long> globalAggregator) {
7969
this.timerService = timerService;
8070
this.keyContext = keyContext;
8171
this.accState = accState;
8272
this.localAggregator = localAggregator;
8373
this.globalAggregator = globalAggregator;
84-
this.requiresCopy = requiresCopy;
85-
this.keySerializer = keySerializer;
8674
}
8775

8876
@Override
@@ -99,14 +87,7 @@ public void combine(WindowKey windowKey, Iterator<RowData> localAccs) throws Exc
9987

10088
private void combineAccumulator(WindowKey windowKey, RowData acc) throws Exception {
10189
// step 1: set current key for states and timers
102-
final RowData key;
103-
if (requiresCopy) {
104-
// the incoming key is reused, we should copy it if state backend doesn't copy it
105-
key = keySerializer.copy(windowKey.getKey());
106-
} else {
107-
key = windowKey.getKey();
108-
}
109-
keyContext.setCurrentKey(key);
90+
keyContext.setCurrentKey(windowKey.getKey());
11091
Long window = windowKey.getWindow();
11192

11293
// step2: merge acc into state
@@ -145,15 +126,12 @@ public static final class Factory implements RecordsCombiner.Factory {
145126

146127
private final GeneratedNamespaceAggsHandleFunction<Long> genLocalAggsHandler;
147128
private final GeneratedNamespaceAggsHandleFunction<Long> genGlobalAggsHandler;
148-
private final TypeSerializer<RowData> keySerializer;
149129

150130
public Factory(
151131
GeneratedNamespaceAggsHandleFunction<Long> genLocalAggsHandler,
152-
GeneratedNamespaceAggsHandleFunction<Long> genGlobalAggsHandler,
153-
TypeSerializer<RowData> keySerializer) {
132+
GeneratedNamespaceAggsHandleFunction<Long> genGlobalAggsHandler) {
154133
this.genLocalAggsHandler = genLocalAggsHandler;
155134
this.genGlobalAggsHandler = genGlobalAggsHandler;
156-
this.keySerializer = keySerializer;
157135
}
158136

159137
@Override
@@ -174,16 +152,13 @@ public RecordsCombiner createRecordsCombiner(
174152
globalAggregator.open(
175153
new PerWindowStateDataViewStore(
176154
stateBackend, LongSerializer.INSTANCE, runtimeContext));
177-
boolean requiresCopy = !isStateImmutableInStateBackend(stateBackend);
178155
WindowValueState<Long> windowValueState = (WindowValueState<Long>) windowState;
179156
return new GlobalAggCombiner(
180157
timerService,
181158
stateBackend::setCurrentKey,
182159
windowValueState,
183160
localAggregator,
184-
globalAggregator,
185-
requiresCopy,
186-
keySerializer);
161+
globalAggregator);
187162
}
188163
}
189164
}

flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorBuilder.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -138,11 +138,7 @@ public WindowRankOperatorBuilder windowEndIndex(int windowEndIndex) {
138138
"Illegal window end index %s, it should not be negative!", windowEndIndex));
139139
final RecordsCombiner.Factory combinerFactory =
140140
new TopNRecordsCombiner.Factory(
141-
generatedSortKeyComparator,
142-
sortKeySelector,
143-
keySerializer,
144-
inputSerializer,
145-
rankEnd);
141+
generatedSortKeyComparator, sortKeySelector, inputSerializer, rankEnd);
146142
final WindowBuffer.Factory bufferFactory =
147143
new RecordsWindowBuffer.Factory(keySerializer, inputSerializer, combinerFactory);
148144
final SlicingWindowProcessor<Long> windowProcessor =

flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/combines/TopNRecordsCombiner.java

Lines changed: 1 addition & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
import java.util.Map;
4242

4343
import static org.apache.flink.table.data.util.RowDataUtil.isAccumulateMsg;
44-
import static org.apache.flink.table.runtime.util.StateConfigUtil.isStateImmutableInStateBackend;
4544

4645
/**
4746
* An implementation of {@link RecordsCombiner} that save topN records of incremental input records
@@ -67,12 +66,6 @@ public final class TopNRecordsCombiner implements RecordsCombiner {
6766
/** TopN size. */
6867
private final long topN;
6968

70-
/** Whether to copy input key, because key is reused. */
71-
private final boolean requiresCopyKey;
72-
73-
/** Serializer to copy key if required. */
74-
private final TypeSerializer<RowData> keySerializer;
75-
7669
/** Serializer to copy record if required. */
7770
private final TypeSerializer<RowData> recordSerializer;
7871

@@ -86,8 +79,6 @@ public TopNRecordsCombiner(
8679
Comparator<RowData> sortKeyComparator,
8780
KeySelector<RowData, RowData> sortKeySelector,
8881
long topN,
89-
boolean requiresCopyKey,
90-
TypeSerializer<RowData> keySerializer,
9182
TypeSerializer<RowData> recordSerializer,
9283
boolean isEventTime) {
9384
this.timerService = timerService;
@@ -96,8 +87,6 @@ public TopNRecordsCombiner(
9687
this.sortKeyComparator = sortKeyComparator;
9788
this.sortKeySelector = sortKeySelector;
9889
this.topN = topN;
99-
this.requiresCopyKey = requiresCopyKey;
100-
this.keySerializer = keySerializer;
10190
this.recordSerializer = recordSerializer;
10291
this.isEventTime = isEventTime;
10392
}
@@ -123,14 +112,7 @@ public void combine(WindowKey windowKey, Iterator<RowData> records) throws Excep
123112

124113
// step 2: flush data in TopNBuffer into state
125114
Iterator<Map.Entry<RowData, Collection<RowData>>> bufferItr = buffer.entrySet().iterator();
126-
final RowData key;
127-
if (requiresCopyKey) {
128-
// the incoming key is reused, we should copy it if state backend doesn't copy it
129-
key = keySerializer.copy(windowKey.getKey());
130-
} else {
131-
key = windowKey.getKey();
132-
}
133-
keyContext.setCurrentKey(key);
115+
keyContext.setCurrentKey(windowKey.getKey());
134116
Long window = windowKey.getWindow();
135117
while (bufferItr.hasNext()) {
136118
Map.Entry<RowData, Collection<RowData>> entry = bufferItr.next();
@@ -165,19 +147,16 @@ public static final class Factory implements RecordsCombiner.Factory {
165147
// The util to compare two sortKey equals to each other.
166148
private final GeneratedRecordComparator generatedSortKeyComparator;
167149
private final KeySelector<RowData, RowData> sortKeySelector;
168-
private final TypeSerializer<RowData> keySerializer;
169150
private final TypeSerializer<RowData> recordSerializer;
170151
private final long topN;
171152

172153
public Factory(
173154
GeneratedRecordComparator genSortKeyComparator,
174155
RowDataKeySelector sortKeySelector,
175-
TypeSerializer<RowData> keySerializer,
176156
TypeSerializer<RowData> recordSerializer,
177157
long topN) {
178158
this.generatedSortKeyComparator = genSortKeyComparator;
179159
this.sortKeySelector = sortKeySelector;
180-
this.keySerializer = keySerializer;
181160
this.recordSerializer = recordSerializer;
182161
this.topN = topN;
183162
}
@@ -192,7 +171,6 @@ public RecordsCombiner createRecordsCombiner(
192171
throws Exception {
193172
final Comparator<RowData> sortKeyComparator =
194173
generatedSortKeyComparator.newInstance(runtimeContext.getUserCodeClassLoader());
195-
boolean requiresCopyKey = !isStateImmutableInStateBackend(stateBackend);
196174
WindowMapState<Long, List<RowData>> windowMapState =
197175
(WindowMapState<Long, List<RowData>>) windowState;
198176
return new TopNRecordsCombiner(
@@ -202,8 +180,6 @@ public RecordsCombiner createRecordsCombiner(
202180
sortKeyComparator,
203181
sortKeySelector,
204182
topN,
205-
requiresCopyKey,
206-
keySerializer,
207183
recordSerializer,
208184
isEventTime);
209185
}

0 commit comments

Comments
 (0)