Skip to content

Commit c9df5fe

Browse files
shuo.cswuchong
authored andcommitted
[FLINK-22304][table] Refactor some interfaces for TVF based window to improve the extendability
1 parent df301ed commit c9df5fe

File tree

15 files changed

+221
-188
lines changed

15 files changed

+221
-188
lines changed

flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalWindowAggregate.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,11 @@
3838
import org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
3939
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
4040
import org.apache.flink.table.runtime.operators.aggregate.window.LocalSlicingWindowAggOperator;
41+
import org.apache.flink.table.runtime.operators.aggregate.window.buffers.RecordsWindowBuffer;
42+
import org.apache.flink.table.runtime.operators.aggregate.window.buffers.WindowBuffer;
43+
import org.apache.flink.table.runtime.operators.aggregate.window.combines.LocalAggCombiner;
4144
import org.apache.flink.table.runtime.operators.window.slicing.SliceAssigner;
45+
import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer;
4246
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
4347
import org.apache.flink.table.runtime.typeutils.PagedTypeSerializer;
4448
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
@@ -65,7 +69,6 @@ public class StreamExecLocalWindowAggregate extends StreamExecWindowAggregateBas
6569
private static final long WINDOW_AGG_MEMORY_RATIO = 100;
6670

6771
public static final String FIELD_NAME_WINDOWING = "windowing";
68-
public static final String FIELD_NAME_NAMED_WINDOW_PROPERTIES = "namedWindowProperties";
6972

7073
@JsonProperty(FIELD_NAME_GROUPING)
7174
private final int[] grouping;
@@ -139,14 +142,17 @@ protected Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
139142
final RowDataKeySelector selector =
140143
KeySelectorUtil.getRowDataSelector(grouping, InternalTypeInfo.of(inputRowType));
141144

145+
PagedTypeSerializer<RowData> keySer =
146+
(PagedTypeSerializer<RowData>) selector.getProducedType().toSerializer();
147+
AbstractRowDataSerializer<RowData> valueSer = new RowDataSerializer(inputRowType);
148+
149+
WindowBuffer.LocalFactory bufferFactory =
150+
new RecordsWindowBuffer.LocalFactory(
151+
keySer, valueSer, new LocalAggCombiner.Factory(generatedAggsHandler));
152+
142153
final OneInputStreamOperator<RowData, RowData> localAggOperator =
143154
new LocalSlicingWindowAggOperator(
144-
selector,
145-
sliceAssigner,
146-
(PagedTypeSerializer<RowData>) selector.getProducedType().toSerializer(),
147-
new RowDataSerializer(inputRowType),
148-
generatedAggsHandler,
149-
shiftTimeZone);
155+
selector, sliceAssigner, bufferFactory, shiftTimeZone);
150156

151157
return ExecNodeUtil.createOneInputTransformation(
152158
inputTransform,

flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java

Lines changed: 4 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,10 @@
2626
import org.apache.flink.streaming.api.watermark.Watermark;
2727
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
2828
import org.apache.flink.table.data.RowData;
29-
import org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
3029
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
31-
import org.apache.flink.table.runtime.operators.aggregate.window.buffers.RecordsWindowBuffer;
3230
import org.apache.flink.table.runtime.operators.aggregate.window.buffers.WindowBuffer;
33-
import org.apache.flink.table.runtime.operators.aggregate.window.combines.LocalAggRecordsCombiner;
34-
import org.apache.flink.table.runtime.operators.window.combines.WindowCombineFunction;
3531
import org.apache.flink.table.runtime.operators.window.slicing.ClockService;
3632
import org.apache.flink.table.runtime.operators.window.slicing.SliceAssigner;
37-
import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer;
38-
import org.apache.flink.table.runtime.typeutils.PagedTypeSerializer;
3933

4034
import java.time.ZoneId;
4135
import java.util.TimeZone;
@@ -55,8 +49,7 @@ public class LocalSlicingWindowAggOperator extends AbstractStreamOperator<RowDat
5549
private final RowDataKeySelector keySelector;
5650
private final SliceAssigner sliceAssigner;
5751
private final long windowInterval;
58-
private final WindowBuffer.Factory windowBufferFactory;
59-
private final WindowCombineFunction.LocalFactory combinerFactory;
52+
private final WindowBuffer.LocalFactory windowBufferFactory;
6053

6154
/**
6255
* The shift timezone of the window, if the proctime or rowtime type is TIMESTAMP_LTZ, the shift
@@ -88,29 +81,12 @@ public class LocalSlicingWindowAggOperator extends AbstractStreamOperator<RowDat
8881
public LocalSlicingWindowAggOperator(
8982
RowDataKeySelector keySelector,
9083
SliceAssigner sliceAssigner,
91-
PagedTypeSerializer<RowData> keySer,
92-
AbstractRowDataSerializer<RowData> inputSer,
93-
GeneratedNamespaceAggsHandleFunction<Long> genAggsHandler,
94-
ZoneId shiftTimezone) {
95-
this(
96-
keySelector,
97-
sliceAssigner,
98-
new RecordsWindowBuffer.Factory(keySer, inputSer),
99-
new LocalAggRecordsCombiner.Factory(genAggsHandler, keySer),
100-
shiftTimezone);
101-
}
102-
103-
public LocalSlicingWindowAggOperator(
104-
RowDataKeySelector keySelector,
105-
SliceAssigner sliceAssigner,
106-
WindowBuffer.Factory windowBufferFactory,
107-
WindowCombineFunction.LocalFactory combinerFactory,
84+
WindowBuffer.LocalFactory windowBufferFactory,
10885
ZoneId shiftTimezone) {
10986
this.keySelector = keySelector;
11087
this.sliceAssigner = sliceAssigner;
11188
this.windowInterval = sliceAssigner.getSliceEndInterval();
11289
this.windowBufferFactory = windowBufferFactory;
113-
this.combinerFactory = combinerFactory;
11490
this.shiftTimezone = shiftTimezone;
11591
this.useDayLightSaving = TimeZone.getTimeZone(shiftTimezone).useDaylightTime();
11692
}
@@ -123,14 +99,13 @@ public void open() throws Exception {
12399
collector = new TimestampedCollector<>(output);
124100
collector.eraseTimestamp();
125101

126-
final WindowCombineFunction localCombiner =
127-
combinerFactory.create(getRuntimeContext(), collector);
128102
this.windowBuffer =
129103
windowBufferFactory.create(
130104
getContainingTask(),
131105
getContainingTask().getEnvironment().getMemoryManager(),
132106
computeMemorySize(),
133-
localCombiner,
107+
getRuntimeContext(),
108+
collector,
134109
shiftTimezone);
135110
}
136111

flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorBuilder.java

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,15 @@
1818

1919
package org.apache.flink.table.runtime.operators.aggregate.window;
2020

21-
import org.apache.flink.api.common.typeutils.TypeSerializer;
2221
import org.apache.flink.table.data.RowData;
2322
import org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
2423
import org.apache.flink.table.runtime.operators.aggregate.window.buffers.RecordsWindowBuffer;
2524
import org.apache.flink.table.runtime.operators.aggregate.window.buffers.WindowBuffer;
26-
import org.apache.flink.table.runtime.operators.aggregate.window.combines.AggRecordsCombiner;
27-
import org.apache.flink.table.runtime.operators.aggregate.window.combines.GlobalAggAccCombiner;
25+
import org.apache.flink.table.runtime.operators.aggregate.window.combines.AggCombiner;
26+
import org.apache.flink.table.runtime.operators.aggregate.window.combines.GlobalAggCombiner;
2827
import org.apache.flink.table.runtime.operators.aggregate.window.processors.SliceSharedWindowAggProcessor;
2928
import org.apache.flink.table.runtime.operators.aggregate.window.processors.SliceUnsharedWindowAggProcessor;
30-
import org.apache.flink.table.runtime.operators.window.combines.WindowCombineFunction;
29+
import org.apache.flink.table.runtime.operators.window.combines.RecordsCombiner;
3130
import org.apache.flink.table.runtime.operators.window.slicing.SliceAssigner;
3231
import org.apache.flink.table.runtime.operators.window.slicing.SliceAssigners.HoppingSliceAssigner;
3332
import org.apache.flink.table.runtime.operators.window.slicing.SliceSharedAssigner;
@@ -64,7 +63,7 @@ public static SlicingWindowAggOperatorBuilder builder() {
6463
private SliceAssigner assigner;
6564
private AbstractRowDataSerializer<RowData> inputSerializer;
6665
private PagedTypeSerializer<RowData> keySerializer;
67-
private TypeSerializer<RowData> accSerializer;
66+
private AbstractRowDataSerializer<RowData> accSerializer;
6867
private GeneratedNamespaceAggsHandleFunction<Long> generatedAggregateFunction;
6968
private GeneratedNamespaceAggsHandleFunction<Long> localGeneratedAggregateFunction;
7069
private GeneratedNamespaceAggsHandleFunction<Long> globalGeneratedAggregateFunction;
@@ -95,7 +94,7 @@ public SlicingWindowAggOperatorBuilder assigner(SliceAssigner assigner) {
9594

9695
public SlicingWindowAggOperatorBuilder aggregate(
9796
GeneratedNamespaceAggsHandleFunction<Long> generatedAggregateFunction,
98-
TypeSerializer<RowData> accSerializer) {
97+
AbstractRowDataSerializer<RowData> accSerializer) {
9998
this.generatedAggregateFunction = generatedAggregateFunction;
10099
this.accSerializer = accSerializer;
101100
return this;
@@ -105,7 +104,7 @@ public SlicingWindowAggOperatorBuilder globalAggregate(
105104
GeneratedNamespaceAggsHandleFunction<Long> localGeneratedAggregateFunction,
106105
GeneratedNamespaceAggsHandleFunction<Long> globalGeneratedAggregateFunction,
107106
GeneratedNamespaceAggsHandleFunction<Long> stateGeneratedAggregateFunction,
108-
TypeSerializer<RowData> accSerializer) {
107+
AbstractRowDataSerializer<RowData> accSerializer) {
109108
this.localGeneratedAggregateFunction = localGeneratedAggregateFunction;
110109
this.globalGeneratedAggregateFunction = globalGeneratedAggregateFunction;
111110
this.generatedAggregateFunction = stateGeneratedAggregateFunction;
@@ -131,28 +130,31 @@ public SlicingWindowAggOperatorBuilder countStarIndex(int indexOfCountStart) {
131130
checkNotNull(keySerializer);
132131
checkNotNull(accSerializer);
133132
checkNotNull(generatedAggregateFunction);
134-
final WindowBuffer.Factory bufferFactory =
135-
new RecordsWindowBuffer.Factory(keySerializer, inputSerializer);
136-
final WindowCombineFunction.Factory combinerFactory;
137-
if (localGeneratedAggregateFunction != null && globalGeneratedAggregateFunction != null) {
133+
134+
boolean isGlobalAgg =
135+
localGeneratedAggregateFunction != null && globalGeneratedAggregateFunction != null;
136+
137+
RecordsCombiner.Factory combinerFactory;
138+
if (isGlobalAgg) {
138139
combinerFactory =
139-
new GlobalAggAccCombiner.Factory(
140+
new GlobalAggCombiner.Factory(
140141
localGeneratedAggregateFunction,
141142
globalGeneratedAggregateFunction,
142143
keySerializer);
143144
} else {
144145
combinerFactory =
145-
new AggRecordsCombiner.Factory(
146+
new AggCombiner.Factory(
146147
generatedAggregateFunction, keySerializer, inputSerializer);
147148
}
149+
final WindowBuffer.Factory bufferFactory =
150+
new RecordsWindowBuffer.Factory(keySerializer, inputSerializer, combinerFactory);
148151

149152
final SlicingWindowProcessor<Long> windowProcessor;
150153
if (assigner instanceof SliceSharedAssigner) {
151154
windowProcessor =
152155
new SliceSharedWindowAggProcessor(
153156
generatedAggregateFunction,
154157
bufferFactory,
155-
combinerFactory,
156158
(SliceSharedAssigner) assigner,
157159
accSerializer,
158160
indexOfCountStart,
@@ -162,7 +164,6 @@ public SlicingWindowAggOperatorBuilder countStarIndex(int indexOfCountStart) {
162164
new SliceUnsharedWindowAggProcessor(
163165
generatedAggregateFunction,
164166
bufferFactory,
165-
combinerFactory,
166167
(SliceUnsharedAssigner) assigner,
167168
accSerializer,
168169
shiftTimeZone);

flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/RecordsWindowBuffer.java

Lines changed: 65 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,21 @@
1818

1919
package org.apache.flink.table.runtime.operators.aggregate.window.buffers;
2020

21+
import org.apache.flink.api.common.functions.RuntimeContext;
2122
import org.apache.flink.runtime.memory.MemoryManager;
23+
import org.apache.flink.runtime.state.KeyedStateBackend;
2224
import org.apache.flink.table.data.RowData;
23-
import org.apache.flink.table.runtime.operators.window.combines.WindowCombineFunction;
25+
import org.apache.flink.table.runtime.operators.window.combines.RecordsCombiner;
26+
import org.apache.flink.table.runtime.operators.window.slicing.WindowTimerService;
27+
import org.apache.flink.table.runtime.operators.window.state.WindowState;
2428
import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer;
2529
import org.apache.flink.table.runtime.typeutils.PagedTypeSerializer;
2630
import org.apache.flink.table.runtime.typeutils.WindowKeySerializer;
2731
import org.apache.flink.table.runtime.util.KeyValueIterator;
2832
import org.apache.flink.table.runtime.util.WindowKey;
2933
import org.apache.flink.table.runtime.util.collections.binary.BytesMap.LookupInfo;
3034
import org.apache.flink.table.runtime.util.collections.binary.WindowBytesMultiMap;
35+
import org.apache.flink.util.Collector;
3136

3237
import java.io.EOFException;
3338
import java.time.ZoneId;
@@ -41,7 +46,7 @@
4146
*/
4247
public final class RecordsWindowBuffer implements WindowBuffer {
4348

44-
private final WindowCombineFunction combineFunction;
49+
private final RecordsCombiner combineFunction;
4550
private final WindowBytesMultiMap recordsBuffer;
4651
private final WindowKey reuseWindowKey;
4752
private final AbstractRowDataSerializer<RowData> recordSerializer;
@@ -53,7 +58,7 @@ public RecordsWindowBuffer(
5358
Object operatorOwner,
5459
MemoryManager memoryManager,
5560
long memorySize,
56-
WindowCombineFunction combineFunction,
61+
RecordsCombiner combineFunction,
5762
PagedTypeSerializer<RowData> keySer,
5863
AbstractRowDataSerializer<RowData> inputSer,
5964
ZoneId shiftTimeZone) {
@@ -117,32 +122,84 @@ public void close() throws Exception {
117122
// Factory
118123
// ------------------------------------------------------------------------------------------
119124

120-
/** Factory to create {@link RecordsWindowBuffer}. */
125+
/** Factory to create {@link RecordsWindowBuffer} with {@link RecordsCombiner.Factory}. */
121126
public static final class Factory implements WindowBuffer.Factory {
122127

123128
private static final long serialVersionUID = 1L;
124129

125130
private final PagedTypeSerializer<RowData> keySer;
126131
private final AbstractRowDataSerializer<RowData> inputSer;
132+
private final RecordsCombiner.Factory factory;
127133

128134
public Factory(
129-
PagedTypeSerializer<RowData> keySer, AbstractRowDataSerializer<RowData> inputSer) {
135+
PagedTypeSerializer<RowData> keySer,
136+
AbstractRowDataSerializer<RowData> inputSer,
137+
RecordsCombiner.Factory combinerFactory) {
130138
this.keySer = keySer;
131139
this.inputSer = inputSer;
140+
this.factory = combinerFactory;
132141
}
133142

134143
@Override
135144
public WindowBuffer create(
136145
Object operatorOwner,
137146
MemoryManager memoryManager,
138147
long memorySize,
139-
WindowCombineFunction combineFunction,
140-
ZoneId shiftTimeZone) {
148+
RuntimeContext runtimeContext,
149+
WindowTimerService<Long> timerService,
150+
KeyedStateBackend<RowData> stateBackend,
151+
WindowState<Long> windowState,
152+
boolean isEventTime,
153+
ZoneId shiftTimeZone)
154+
throws Exception {
155+
RecordsCombiner combiner =
156+
factory.createRecordsCombiner(
157+
runtimeContext, timerService, stateBackend, windowState, isEventTime);
141158
return new RecordsWindowBuffer(
142159
operatorOwner,
143160
memoryManager,
144161
memorySize,
145-
combineFunction,
162+
combiner,
163+
keySer,
164+
inputSer,
165+
shiftTimeZone);
166+
}
167+
}
168+
169+
/** Factory to create {@link RecordsWindowBuffer} with {@link RecordsCombiner.LocalFactory}. */
170+
public static final class LocalFactory implements WindowBuffer.LocalFactory {
171+
172+
private static final long serialVersionUID = 1L;
173+
174+
private final PagedTypeSerializer<RowData> keySer;
175+
private final AbstractRowDataSerializer<RowData> inputSer;
176+
private final RecordsCombiner.LocalFactory localFactory;
177+
178+
public LocalFactory(
179+
PagedTypeSerializer<RowData> keySer,
180+
AbstractRowDataSerializer<RowData> inputSer,
181+
RecordsCombiner.LocalFactory localFactory) {
182+
this.keySer = keySer;
183+
this.inputSer = inputSer;
184+
this.localFactory = localFactory;
185+
}
186+
187+
@Override
188+
public WindowBuffer create(
189+
Object operatorOwner,
190+
MemoryManager memoryManager,
191+
long memorySize,
192+
RuntimeContext runtimeContext,
193+
Collector<RowData> collector,
194+
ZoneId shiftTimeZone)
195+
throws Exception {
196+
RecordsCombiner combiner =
197+
localFactory.createRecordsCombiner(runtimeContext, collector);
198+
return new RecordsWindowBuffer(
199+
operatorOwner,
200+
memoryManager,
201+
memorySize,
202+
combiner,
146203
keySer,
147204
inputSer,
148205
shiftTimeZone);

0 commit comments

Comments
 (0)