Skip to content

Commit 9de1ca9

Browse files
committed
[FLINK-20487][table-planner-blink] Support to consume retractions for window aggregate operator
1 parent b9e576f commit 9de1ca9

File tree

8 files changed

+206
-55
lines changed

8 files changed

+206
-55
lines changed

flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -199,8 +199,8 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
199199
createNewNode(agg, children, providedTrait, requiredTrait, requester)
200200

201201
case window: StreamPhysicalGroupWindowAggregateBase =>
202-
// WindowAggregate and WindowTableAggregate support insert-only in input
203-
val children = visitChildren(window, ModifyKindSetTrait.INSERT_ONLY)
202+
// WindowAggregate and WindowTableAggregate support all changes in input
203+
val children = visitChildren(window, ModifyKindSetTrait.ALL_CHANGES)
204204
val builder = ModifyKindSet.newBuilder()
205205
.addContainedKind(ModifyKind.INSERT)
206206
if (window.emitStrategy.produceUpdates) {
@@ -457,15 +457,15 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
457457

458458
case _: StreamPhysicalGroupAggregate | _: StreamPhysicalGroupTableAggregate |
459459
_: StreamPhysicalLimit | _: StreamPhysicalPythonGroupAggregate |
460-
_: StreamPhysicalPythonGroupTableAggregate =>
460+
_: StreamPhysicalPythonGroupTableAggregate | _: StreamPhysicalGroupWindowAggregate |
461+
_: StreamPhysicalGroupWindowTableAggregate =>
461462
// Aggregate, TableAggregate and Limit requires update_before if there are updates
462463
val requiredChildTrait = beforeAfterOrNone(getModifyKindSet(rel.getInput(0)))
463464
val children = visitChildren(rel, requiredChildTrait)
464465
// use requiredTrait as providedTrait, because they should support all kinds of UpdateKind
465466
createNewNode(rel, children, requiredTrait)
466467

467-
case _: StreamPhysicalGroupWindowAggregate | _: StreamPhysicalGroupWindowTableAggregate |
468-
_: StreamPhysicalDeduplicate | _: StreamPhysicalTemporalSort | _: StreamPhysicalMatch |
468+
case _: StreamPhysicalDeduplicate | _: StreamPhysicalTemporalSort | _: StreamPhysicalMatch |
469469
_: StreamPhysicalOverAggregate | _: StreamPhysicalIntervalJoin |
470470
_: StreamPhysicalPythonGroupWindowAggregate | _: StreamPhysicalPythonOverAggregate =>
471471
// WindowAggregate, WindowTableAggregate, Deduplicate, TemporalSort, CEP, OverAggregate

flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.xml

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,39 @@ Calc(select=[a, 3:BIGINT AS $1])
6969
+- Exchange(distribution=[hash[b]])
7070
+- Calc(select=[a, b, rowtime])
7171
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
72+
]]>
73+
</Resource>
74+
</TestCase>
75+
<TestCase name="testLastRowWithWindowOnRowtime">
76+
<Resource name="explain">
77+
<![CDATA[== Abstract Syntax Tree ==
78+
LogicalProject(b=[$0], EXPR$1=[$2], EXPR$2=[TUMBLE_START($1)])
79+
+- LogicalAggregate(group=[{0, 1}], EXPR$1=[SUM($2)])
80+
+- LogicalProject(b=[$1], $f1=[$TUMBLE($2, 4:INTERVAL SECOND)], a=[$0])
81+
+- LogicalFilter(condition=[=($3, 1)])
82+
+- LogicalProject(a=[$0], b=[$1], ts=[$2], rowNum=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $2 DESC NULLS LAST)])
83+
+- LogicalWatermarkAssigner(rowtime=[ts], watermark=[$2])
84+
+- LogicalTableScan(table=[[default_catalog, default_database, T, source: [CollectionTableSource(a, b, ts)]]])
85+
86+
== Optimized Physical Plan ==
87+
Calc(select=[b, EXPR$1, w$start AS EXPR$2])
88+
+- GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow('w$, ts, 4)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[b, SUM(a) AS EXPR$1, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
89+
+- Exchange(distribution=[hash[b]])
90+
+- Calc(select=[b, ts, a])
91+
+- Deduplicate(keep=[LastRow], key=[a], order=[ROWTIME])
92+
+- Exchange(distribution=[hash[a]])
93+
+- WatermarkAssigner(rowtime=[ts], watermark=[ts])
94+
+- LegacyTableSourceScan(table=[[default_catalog, default_database, T, source: [CollectionTableSource(a, b, ts)]]], fields=[a, b, ts])
95+
96+
== Optimized Execution Plan ==
97+
Calc(select=[b, EXPR$1, w$start AS EXPR$2])
98+
+- GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow('w$, ts, 4)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[b, SUM(a) AS EXPR$1, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
99+
+- Exchange(distribution=[hash[b]])
100+
+- Calc(select=[b, ts, a])
101+
+- Deduplicate(keep=[LastRow], key=[a], order=[ROWTIME])
102+
+- Exchange(distribution=[hash[a]])
103+
+- WatermarkAssigner(rowtime=[ts], watermark=[ts])
104+
+- LegacyTableSourceScan(table=[[default_catalog, default_database, T, source: [CollectionTableSource(a, b, ts)]]], fields=[a, b, ts])
72105
]]>
73106
</Resource>
74107
</TestCase>

flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml

Lines changed: 38 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -594,6 +594,30 @@ Union(all=[true], union=[b, ts, a], changelogMode=[I,UA,D])
594594
+- GroupAggregate(groupBy=[a], select=[a, MAX(ts) AS t, MAX(b) AS b], changelogMode=[I,UA])
595595
+- Exchange(distribution=[hash[a]], changelogMode=[I])
596596
+- TableSourceScan(table=[[default_catalog, default_database, append_src]], fields=[ts, a, b], changelogMode=[I])
597+
]]>
598+
</Resource>
599+
</TestCase>
600+
<TestCase name="testUpsertSourceWithComputedColumnAndWatermark">
601+
<Resource name="sql">
602+
<![CDATA[SELECT a, b, c FROM src WHERE a > 1]]>
603+
</Resource>
604+
<Resource name="ast">
605+
<![CDATA[
606+
LogicalProject(a=[$1], b=[$2], c=[$3])
607+
+- LogicalFilter(condition=[>($1, 1)])
608+
+- LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($4, 1000:INTERVAL SECOND)])
609+
+- LogicalProject(id=[$0], a=[$1], b=[+($1, 1)], c=[$2], ts=[TO_TIMESTAMP($2)])
610+
+- LogicalTableScan(table=[[default_catalog, default_database, src]])
611+
]]>
612+
</Resource>
613+
<Resource name="optimized rel plan">
614+
<![CDATA[
615+
Calc(select=[a, b, c], where=[>(a, 1)], changelogMode=[I,UB,UA,D])
616+
+- ChangelogNormalize(key=[id], changelogMode=[I,UB,UA,D])
617+
+- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
618+
+- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)], changelogMode=[UA,D])
619+
+- Calc(select=[id, a, +(a, 1) AS b, c, TO_TIMESTAMP(c) AS ts], changelogMode=[UA,D])
620+
+- TableSourceScan(table=[[default_catalog, default_database, src]], fields=[id, a, c], changelogMode=[UA,D])
597621
]]>
598622
</Resource>
599623
</TestCase>
@@ -618,27 +642,28 @@ Calc(select=[id, Reinterpret(TO_TIMESTAMP(c)) AS ts], changelogMode=[I,UA,D])
618642
]]>
619643
</Resource>
620644
</TestCase>
621-
<TestCase name="testUpsertSourceWithComputedColumnAndWatermark">
645+
<TestCase name="testWindowAggregateOnChangelogSource">
622646
<Resource name="sql">
623-
<![CDATA[SELECT a, b, c FROM src WHERE a > 1]]>
647+
<![CDATA[
648+
SELECT TUMBLE_START(ts, INTERVAL '10' SECOND), COUNT(*)
649+
FROM src
650+
GROUP BY TUMBLE(ts, INTERVAL '10' SECOND)
651+
]]>
624652
</Resource>
625653
<Resource name="ast">
626654
<![CDATA[
627-
LogicalProject(a=[$1], b=[$2], c=[$3])
628-
+- LogicalFilter(condition=[>($1, 1)])
629-
+- LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($4, 1000:INTERVAL SECOND)])
630-
+- LogicalProject(id=[$0], a=[$1], b=[+($1, 1)], c=[$2], ts=[TO_TIMESTAMP($2)])
631-
+- LogicalTableScan(table=[[default_catalog, default_database, src]])
655+
LogicalProject(EXPR$0=[TUMBLE_START($0)], EXPR$1=[$1])
656+
+- LogicalAggregate(group=[{0}], EXPR$1=[COUNT()])
657+
+- LogicalProject($f0=[$TUMBLE(PROCTIME(), 10000:INTERVAL SECOND)])
658+
+- LogicalTableScan(table=[[default_catalog, default_database, src]])
632659
]]>
633660
</Resource>
634661
<Resource name="optimized rel plan">
635662
<![CDATA[
636-
Calc(select=[a, b, c], where=[>(a, 1)], changelogMode=[I,UB,UA,D])
637-
+- ChangelogNormalize(key=[id], changelogMode=[I,UB,UA,D])
638-
+- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
639-
+- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)], changelogMode=[UA,D])
640-
+- Calc(select=[id, a, +(a, 1) AS b, c, TO_TIMESTAMP(c) AS ts], changelogMode=[UA,D])
641-
+- TableSourceScan(table=[[default_catalog, default_database, src]], fields=[id, a, c], changelogMode=[UA,D])
663+
Calc(select=[CAST(w$start) AS EXPR$0, EXPR$1], changelogMode=[I])
664+
+- GroupWindowAggregate(window=[TumblingGroupWindow('w$, $f2, 10000)], properties=[w$start, w$end, w$proctime], select=[COUNT(*) AS EXPR$1, start('w$) AS w$start, end('w$) AS w$end, proctime('w$) AS w$proctime], changelogMode=[I])
665+
+- Exchange(distribution=[single], changelogMode=[I,UB,UA])
666+
+- TableSourceScan(table=[[default_catalog, default_database, src, project=[]]], fields=[], changelogMode=[I,UB,UA])
642667
]]>
643668
</Resource>
644669
</TestCase>

flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml

Lines changed: 68 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -642,35 +642,6 @@ Calc(select=[w$start AS EXPR$0, cnt], changelogMode=[I])
642642
+- Exchange(distribution=[single], changelogMode=[I])
643643
+- Calc(select=[rowtime], changelogMode=[I])
644644
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime], changelogMode=[I])
645-
]]>
646-
</Resource>
647-
</TestCase>
648-
<TestCase name="testWindowGroupByOnConstant">
649-
<Resource name="sql">
650-
<![CDATA[
651-
SELECT COUNT(*),
652-
weightedAvg(c, a) AS wAvg,
653-
TUMBLE_START(rowtime, INTERVAL '15' MINUTE),
654-
TUMBLE_END(rowtime, INTERVAL '15' MINUTE)
655-
FROM MyTable
656-
GROUP BY 'a', TUMBLE(rowtime, INTERVAL '15' MINUTE)
657-
]]>
658-
</Resource>
659-
<Resource name="ast">
660-
<![CDATA[
661-
LogicalProject(EXPR$0=[$2], wAvg=[$3], EXPR$2=[TUMBLE_START($1)], EXPR$3=[TUMBLE_END($1)])
662-
+- LogicalAggregate(group=[{0, 1}], EXPR$0=[COUNT()], wAvg=[weightedAvg($2, $3)])
663-
+- LogicalProject($f0=[_UTF-16LE'a'], $f1=[$TUMBLE($4, 900000:INTERVAL MINUTE)], c=[$2], $f3=[CAST($0):BIGINT])
664-
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
665-
]]>
666-
</Resource>
667-
<Resource name="optimized exec plan">
668-
<![CDATA[
669-
Calc(select=[EXPR$0, wAvg, w$start AS EXPR$2, w$end AS EXPR$3])
670-
+- GroupWindowAggregate(window=[TumblingGroupWindow('w$, rowtime, 900000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[COUNT(*) AS EXPR$0, weightedAvg(c, a) AS wAvg, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
671-
+- Exchange(distribution=[single])
672-
+- Calc(select=[rowtime, c, CAST(a) AS a])
673-
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
674645
]]>
675646
</Resource>
676647
</TestCase>
@@ -718,6 +689,74 @@ Union(all=[true], union=[EXPR$0])
718689
+- Calc(select=[1 AS EXPR$0])
719690
+- GroupWindowAggregate(window=[SlidingGroupWindow('w$, rowtime, 7200000, 3600000)], select=[])
720691
+- Reused(reference_id=[1])
692+
]]>
693+
</Resource>
694+
</TestCase>
695+
<TestCase name="testWindowGroupByOnConstant">
696+
<Resource name="sql">
697+
<![CDATA[
698+
SELECT COUNT(*),
699+
weightedAvg(c, a) AS wAvg,
700+
TUMBLE_START(rowtime, INTERVAL '15' MINUTE),
701+
TUMBLE_END(rowtime, INTERVAL '15' MINUTE)
702+
FROM MyTable
703+
GROUP BY 'a', TUMBLE(rowtime, INTERVAL '15' MINUTE)
704+
]]>
705+
</Resource>
706+
<Resource name="ast">
707+
<![CDATA[
708+
LogicalProject(EXPR$0=[$2], wAvg=[$3], EXPR$2=[TUMBLE_START($1)], EXPR$3=[TUMBLE_END($1)])
709+
+- LogicalAggregate(group=[{0, 1}], EXPR$0=[COUNT()], wAvg=[weightedAvg($2, $3)])
710+
+- LogicalProject($f0=[_UTF-16LE'a'], $f1=[$TUMBLE($4, 900000:INTERVAL MINUTE)], c=[$2], $f3=[CAST($0):BIGINT])
711+
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
712+
]]>
713+
</Resource>
714+
<Resource name="optimized exec plan">
715+
<![CDATA[
716+
Calc(select=[EXPR$0, wAvg, w$start AS EXPR$2, w$end AS EXPR$3])
717+
+- GroupWindowAggregate(window=[TumblingGroupWindow('w$, rowtime, 900000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[COUNT(*) AS EXPR$0, weightedAvg(c, a) AS wAvg, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
718+
+- Exchange(distribution=[single])
719+
+- Calc(select=[rowtime, c, CAST(a) AS a])
720+
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
721+
]]>
722+
</Resource>
723+
</TestCase>
724+
<TestCase name="testWindowAggregateOnRetractStream">
725+
<Resource name="sql">
726+
<![CDATA[
727+
SELECT TUMBLE_START(`rowtime`, INTERVAL '1' SECOND), COUNT(*) cnt
728+
FROM (
729+
SELECT a, b, c, rowtime
730+
FROM (
731+
SELECT *,
732+
ROW_NUMBER() OVER (PARTITION BY a ORDER BY rowtime DESC) as rowNum
733+
FROM MyTable
734+
)
735+
WHERE rowNum = 1
736+
)
737+
GROUP BY TUMBLE(`rowtime`, INTERVAL '1' SECOND)
738+
]]>
739+
</Resource>
740+
<Resource name="ast">
741+
<![CDATA[
742+
LogicalProject(EXPR$0=[TUMBLE_START($0)], cnt=[$1])
743+
+- LogicalAggregate(group=[{0}], cnt=[COUNT()])
744+
+- LogicalProject($f0=[$TUMBLE($4, 1000:INTERVAL SECOND)])
745+
+- LogicalFilter(condition=[=($5, 1)])
746+
+- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], rowNum=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $4 DESC NULLS LAST)])
747+
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
748+
]]>
749+
</Resource>
750+
<Resource name="optimized rel plan">
751+
<![CDATA[
752+
Calc(select=[w$start AS EXPR$0, cnt], changelogMode=[I])
753+
+- GroupWindowAggregate(window=[TumblingGroupWindow('w$, rowtime, 1000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[COUNT(*) AS cnt, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime], changelogMode=[I])
754+
+- Exchange(distribution=[single], changelogMode=[I,UB,UA,D])
755+
+- Calc(select=[rowtime], changelogMode=[I,UB,UA,D])
756+
+- Deduplicate(keep=[LastRow], key=[a], order=[ROWTIME], changelogMode=[I,UB,UA,D])
757+
+- Exchange(distribution=[hash[a]], changelogMode=[I])
758+
+- Calc(select=[a, rowtime], changelogMode=[I])
759+
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime], changelogMode=[I])
721760
]]>
722761
</Resource>
723762
</TestCase>

flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.scala

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,9 +105,6 @@ class DeduplicateTest extends TableTestBase {
105105
|GROUP BY b, TUMBLE(ts, INTERVAL '0.004' SECOND)
106106
""".stripMargin
107107

108-
thrown.expect(classOf[TableException])
109-
thrown.expectMessage("GroupWindowAggregate doesn't support consuming update " +
110-
"and delete changes which is produced by node Deduplicate(")
111108
util.verifyExplain(windowSql)
112109
}
113110

flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -592,7 +592,7 @@ class TableScanTest extends TableTestBase {
592592
}
593593

594594
@Test
595-
def testUnsupportedWindowAggregateOnChangelogSource(): Unit = {
595+
def testWindowAggregateOnChangelogSource(): Unit = {
596596
util.addTable(
597597
"""
598598
|CREATE TABLE src (
@@ -610,10 +610,6 @@ class TableScanTest extends TableTestBase {
610610
|FROM src
611611
|GROUP BY TUMBLE(ts, INTERVAL '10' SECOND)
612612
|""".stripMargin
613-
thrown.expect(classOf[TableException])
614-
thrown.expectMessage(
615-
"GroupWindowAggregate doesn't support consuming update changes " +
616-
"which is produced by node TableSourceScan")
617613
util.verifyRelPlan(query, ExplainDetail.CHANGELOG_MODE)
618614
}
619615

flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -456,4 +456,23 @@ class WindowAggregateTest extends TableTestBase {
456456
|""".stripMargin
457457
util.verifyRelPlan(sql, ExplainDetail.CHANGELOG_MODE)
458458
}
459+
460+
@Test
461+
def testWindowAggregateOnRetractStream(): Unit = {
462+
val sql =
463+
"""
464+
|SELECT TUMBLE_START(`rowtime`, INTERVAL '1' SECOND), COUNT(*) cnt
465+
|FROM (
466+
| SELECT a, b, c, rowtime
467+
| FROM (
468+
| SELECT *,
469+
| ROW_NUMBER() OVER (PARTITION BY a ORDER BY rowtime DESC) as rowNum
470+
| FROM MyTable
471+
| )
472+
| WHERE rowNum = 1
473+
|)
474+
|GROUP BY TUMBLE(`rowtime`, INTERVAL '1' SECOND)
475+
|""".stripMargin
476+
util.verifyRelPlan(sql, ExplainDetail.CHANGELOG_MODE)
477+
}
459478
}

flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,48 @@ class WindowAggregateITCase(mode: StateBackendMode)
347347
assertEquals(expected.sorted, sink.getAppendResults.sorted)
348348
}
349349

350+
@Test
351+
def testWindowAggregateOnRetractStream(): Unit = {
352+
val stream = failingDataSource(data)
353+
.assignTimestampsAndWatermarks(
354+
new TimestampAndWatermarkWithOffset[(
355+
Long, Int, Double, Float, BigDecimal, String, String)](10L))
356+
val table =
357+
stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string, 'name)
358+
tEnv.registerTable("T1", table)
359+
val sql =
360+
"""
361+
|SELECT
362+
|`string`,
363+
|TUMBLE_START(rowtime, INTERVAL '0.005' SECOND) as w_start,
364+
|TUMBLE_END(rowtime, INTERVAL '0.005' SECOND) as w_end,
365+
|COUNT(1) AS cnt
366+
|FROM
367+
| (
368+
| SELECT `string`, rowtime
369+
| FROM (
370+
| SELECT *,
371+
| ROW_NUMBER() OVER (PARTITION BY `string` ORDER BY rowtime DESC) as rowNum
372+
| FROM T1
373+
| )
374+
| WHERE rowNum = 1
375+
|)
376+
|GROUP BY `string`, TUMBLE(rowtime, INTERVAL '0.005' SECOND)
377+
|""".stripMargin
378+
val sink = new TestingAppendSink
379+
tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink)
380+
env.execute()
381+
val expected = Seq(
382+
"Hi,1970-01-01T00:00,1970-01-01T00:00:00.005,1",
383+
"Hallo,1970-01-01T00:00,1970-01-01T00:00:00.005,1",
384+
"Hello,1970-01-01T00:00,1970-01-01T00:00:00.005,0",
385+
"Hello,1970-01-01T00:00:00.005,1970-01-01T00:00:00.010,1",
386+
"Hello world,1970-01-01T00:00:00.005,1970-01-01T00:00:00.010,0",
387+
"Hello world,1970-01-01T00:00:00.015,1970-01-01T00:00:00.020,1",
388+
"null,1970-01-01T00:00:00.030,1970-01-01T00:00:00.035,1")
389+
assertEquals(expected.sorted, sink.getAppendResults.sorted)
390+
}
391+
350392
private def withLateFireDelay(tableConfig: TableConfig, interval: Time): Unit = {
351393
val intervalInMillis = interval.toMilliseconds
352394
val lateFireDelay: Duration = tableConfig.getConfiguration

0 commit comments

Comments
 (0)