diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
index 582ba8b5dc6f8..e0e9a0f1686b8 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
@@ -200,8 +200,8 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
createNewNode(agg, children, providedTrait, requiredTrait, requester)
case window: StreamPhysicalGroupWindowAggregateBase =>
- // WindowAggregate and WindowTableAggregate support insert-only in input
- val children = visitChildren(window, ModifyKindSetTrait.INSERT_ONLY)
+ // WindowAggregate and WindowTableAggregate support all changes in input
+ val children = visitChildren(window, ModifyKindSetTrait.ALL_CHANGES)
val builder = ModifyKindSet.newBuilder()
.addContainedKind(ModifyKind.INSERT)
if (window.emitStrategy.produceUpdates) {
@@ -470,20 +470,20 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
case _: StreamPhysicalGroupAggregate | _: StreamPhysicalGroupTableAggregate |
_: StreamPhysicalLimit | _: StreamPhysicalPythonGroupAggregate |
- _: StreamPhysicalPythonGroupTableAggregate =>
- // Aggregate, TableAggregate and Limit requires update_before if there are updates
+ _: StreamPhysicalPythonGroupTableAggregate |
+ _: StreamPhysicalGroupWindowAggregateBase =>
+ // Aggregate, TableAggregate, Limit and GroupWindowAggregate requires update_before if
+ // there are updates
val requiredChildTrait = beforeAfterOrNone(getModifyKindSet(rel.getInput(0)))
val children = visitChildren(rel, requiredChildTrait)
// use requiredTrait as providedTrait, because they should support all kinds of UpdateKind
createNewNode(rel, children, requiredTrait)
- case _: StreamPhysicalGroupWindowAggregate | _: StreamPhysicalGroupWindowTableAggregate |
- _: StreamPhysicalWindowAggregate | _: StreamPhysicalWindowRank |
+ case _: StreamPhysicalWindowAggregate | _: StreamPhysicalWindowRank |
_: StreamPhysicalDeduplicate | _: StreamPhysicalTemporalSort | _: StreamPhysicalMatch |
_: StreamPhysicalOverAggregate | _: StreamPhysicalIntervalJoin |
- _: StreamPhysicalPythonGroupWindowAggregate | _: StreamPhysicalPythonOverAggregate |
- _: StreamPhysicalWindowJoin =>
- // WindowAggregate, WindowAggregate, WindowTableAggregate, Deduplicate, TemporalSort, CEP,
+ _: StreamPhysicalPythonOverAggregate | _: StreamPhysicalWindowJoin =>
+ // WindowAggregate, WindowTableAggregate, Deduplicate, TemporalSort, CEP,
// OverAggregate, and IntervalJoin, WindowJoin require nothing about UpdateKind.
val children = visitChildren(rel, UpdateKindTrait.NONE)
createNewNode(rel, children, requiredTrait)
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.xml
index 71358b503a2b0..75a712f2ad1bf 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.xml
@@ -69,6 +69,39 @@ Calc(select=[a, 3:BIGINT AS $1])
+- Exchange(distribution=[hash[b]])
+- Calc(select=[a, b, rowtime])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
+]]>
+
+
+
+
+
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
index 7dab054bd0779..d8589fc0704fc 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
@@ -594,6 +594,30 @@ Union(all=[true], union=[b, ts, a], changelogMode=[I,UA,D])
+- GroupAggregate(groupBy=[a], select=[a, MAX(ts) AS t, MAX(b) AS b], changelogMode=[I,UA])
+- Exchange(distribution=[hash[a]], changelogMode=[I])
+- TableSourceScan(table=[[default_catalog, default_database, append_src]], fields=[ts, a, b], changelogMode=[I])
+]]>
+
+
+
+
+ 1]]>
+
+
+ ($1, 1)])
+ +- LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($4, 1000:INTERVAL SECOND)])
+ +- LogicalProject(id=[$0], a=[$1], b=[+($1, 1)], c=[$2], ts=[TO_TIMESTAMP($2)])
+ +- LogicalTableScan(table=[[default_catalog, default_database, src]])
+]]>
+
+
+ (a, 1)], changelogMode=[I,UB,UA,D])
++- ChangelogNormalize(key=[id], changelogMode=[I,UB,UA,D])
+ +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
+ +- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)], changelogMode=[UA,D])
+ +- Calc(select=[id, a, +(a, 1) AS b, c, TO_TIMESTAMP(c) AS ts], changelogMode=[UA,D])
+ +- TableSourceScan(table=[[default_catalog, default_database, src]], fields=[id, a, c], changelogMode=[UA,D])
]]>
@@ -618,27 +642,28 @@ Calc(select=[id, Reinterpret(TO_TIMESTAMP(c)) AS ts], changelogMode=[I,UA,D])
]]>
-
+
- 1]]>
+
($1, 1)])
- +- LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($4, 1000:INTERVAL SECOND)])
- +- LogicalProject(id=[$0], a=[$1], b=[+($1, 1)], c=[$2], ts=[TO_TIMESTAMP($2)])
- +- LogicalTableScan(table=[[default_catalog, default_database, src]])
+LogicalProject(EXPR$0=[TUMBLE_START($0)], EXPR$1=[$1])
++- LogicalAggregate(group=[{0}], EXPR$1=[COUNT()])
+ +- LogicalProject($f0=[$TUMBLE(PROCTIME(), 10000:INTERVAL SECOND)])
+ +- LogicalTableScan(table=[[default_catalog, default_database, src]])
]]>
(a, 1)], changelogMode=[I,UB,UA,D])
-+- ChangelogNormalize(key=[id], changelogMode=[I,UB,UA,D])
- +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
- +- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)], changelogMode=[UA,D])
- +- Calc(select=[id, a, +(a, 1) AS b, c, TO_TIMESTAMP(c) AS ts], changelogMode=[UA,D])
- +- TableSourceScan(table=[[default_catalog, default_database, src]], fields=[id, a, c], changelogMode=[UA,D])
+Calc(select=[w$start AS EXPR$0, EXPR$1], changelogMode=[I])
++- GroupWindowAggregate(window=[TumblingGroupWindow('w$, $f2, 10000)], properties=[w$start, w$end, w$proctime], select=[COUNT(*) AS EXPR$1, start('w$) AS w$start, end('w$) AS w$end, proctime('w$) AS w$proctime], changelogMode=[I])
+ +- Exchange(distribution=[single], changelogMode=[I,UB,UA])
+ +- TableSourceScan(table=[[default_catalog, default_database, src, project=[]]], fields=[], changelogMode=[I,UB,UA])
]]>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/GroupWindowTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/GroupWindowTest.xml
index 0f330752ac661..1c1a42f45d015 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/GroupWindowTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/GroupWindowTest.xml
@@ -131,32 +131,6 @@ Calc(select=[EXPR$0, wAvg, w$start AS EXPR$2, w$end AS EXPR$3])
+- Exchange(distribution=[single])
+- Calc(select=[proctime, c, CAST(a) AS a])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
-]]>
-
-
-
-
-
-
-
-
-
-
-
@@ -202,6 +176,53 @@ GroupWindowAggregate(window=[TumblingGroupWindow('w$, proctime, 3024000000)], se
+- Exchange(distribution=[single])
+- Calc(select=[proctime])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
+]]>
+
+
+
+
+
+
+
+
+
+
+
@@ -549,53 +570,6 @@ Calc(select=[EXPR$0])
+- Exchange(distribution=[hash[a, b, c]])
+- Calc(select=[a, b, c, rowtime])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
-]]>
-
-
-
-
-
-
-
-
-
-
-
@@ -619,11 +593,19 @@ GroupWindowAggregate(window=[TumblingGroupWindow('w$, $f2, 1000)], select=[SUM(a
]]>
-
+
@@ -632,45 +614,74 @@ GROUP BY TUMBLE(`rowtime`, INTERVAL '1' SECOND)
LogicalProject(EXPR$0=[TUMBLE_START($0)], cnt=[$1])
+- LogicalAggregate(group=[{0}], cnt=[COUNT()])
+- LogicalProject($f0=[$TUMBLE($4, 1000:INTERVAL SECOND)])
- +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+ +- LogicalFilter(condition=[=($5, 1)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], rowNum=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $4 DESC NULLS LAST)])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
]]>
-
+
+
+
+
+
+
+
+
+
+
+
+
+GROUP BY TUMBLE(`rowtime`, INTERVAL '1' SECOND)
+]]>
-
+
@@ -718,6 +729,61 @@ Union(all=[true], union=[EXPR$0])
+- Calc(select=[1 AS EXPR$0])
+- GroupWindowAggregate(window=[SlidingGroupWindow('w$, rowtime, 7200000, 3600000)], select=[])
+- Reused(reference_id=[1])
+]]>
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.scala
index 6455b99101443..a6ec4e894f10a 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.scala
@@ -105,9 +105,6 @@ class DeduplicateTest extends TableTestBase {
|GROUP BY b, TUMBLE(ts, INTERVAL '0.004' SECOND)
""".stripMargin
- thrown.expect(classOf[TableException])
- thrown.expectMessage("GroupWindowAggregate doesn't support consuming update " +
- "and delete changes which is produced by node Deduplicate(")
util.verifyExplain(windowSql)
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
index 4badad9a36084..46608a0b2927f 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
@@ -592,7 +592,7 @@ class TableScanTest extends TableTestBase {
}
@Test
- def testUnsupportedWindowAggregateOnChangelogSource(): Unit = {
+ def testWindowAggregateOnChangelogSource(): Unit = {
util.addTable(
"""
|CREATE TABLE src (
@@ -610,10 +610,6 @@ class TableScanTest extends TableTestBase {
|FROM src
|GROUP BY TUMBLE(ts, INTERVAL '10' SECOND)
|""".stripMargin
- thrown.expect(classOf[TableException])
- thrown.expectMessage(
- "GroupWindowAggregate doesn't support consuming update changes " +
- "which is produced by node TableSourceScan")
util.verifyRelPlan(query, ExplainDetail.CHANGELOG_MODE)
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/GroupWindowTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/GroupWindowTest.scala
index 42622b50be7ab..55627eb175aa8 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/GroupWindowTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/GroupWindowTest.scala
@@ -456,4 +456,46 @@ class GroupWindowTest extends TableTestBase {
|""".stripMargin
util.verifyRelPlan(sql, ExplainDetail.CHANGELOG_MODE)
}
+
+ @Test
+ def testWindowAggregateOnRetractStream(): Unit = {
+ val sql =
+ """
+ |SELECT TUMBLE_START(`rowtime`, INTERVAL '1' SECOND), COUNT(*) cnt
+ |FROM (
+ | SELECT a, b, c, rowtime
+ | FROM (
+ | SELECT *,
+ | ROW_NUMBER() OVER (PARTITION BY a ORDER BY rowtime DESC) as rowNum
+ | FROM MyTable
+ | )
+ | WHERE rowNum = 1
+ |)
+ |GROUP BY TUMBLE(`rowtime`, INTERVAL '1' SECOND)
+ |""".stripMargin
+ util.verifyRelPlan(sql, ExplainDetail.CHANGELOG_MODE)
+ }
+
+ @Test
+ def testWindowAggregateOnUpsertSource(): Unit = {
+ util.addTable(
+ """
+ |CREATE TABLE src (
+ | ts AS PROCTIME(),
+ | a INT,
+ | b DOUBLE,
+ | PRIMARY KEY (a) NOT ENFORCED
+ |) WITH (
+ | 'connector' = 'values',
+ | 'changelog-mode' = 'UA,D'
+ |)
+ """.stripMargin)
+ val query =
+ """
+ |SELECT TUMBLE_START(ts, INTERVAL '10' SECOND), COUNT(*)
+ |FROM src
+ |GROUP BY TUMBLE(ts, INTERVAL '10' SECOND)
+ |""".stripMargin
+ util.verifyRelPlan(query, ExplainDetail.CHANGELOG_MODE)
+ }
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/GroupWindowITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/GroupWindowITCase.scala
index 11b706ac263c4..659c639a1f49f 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/GroupWindowITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/GroupWindowITCase.scala
@@ -24,6 +24,8 @@ import org.apache.flink.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.api.internal.TableEnvironmentInternal
+import org.apache.flink.table.planner.factories.TestValuesTableFactory
+import org.apache.flink.table.planner.factories.TestValuesTableFactory.{changelogRow, registerData}
import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.{ConcatDistinctAggFunction, WeightedAvg}
import org.apache.flink.table.planner.plan.utils.WindowEmitStrategy.{TABLE_EXEC_EMIT_LATE_FIRE_DELAY, TABLE_EXEC_EMIT_LATE_FIRE_ENABLED}
import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.{HEAP_BACKEND, ROCKSDB_BACKEND, StateBackendMode}
@@ -32,16 +34,15 @@ import org.apache.flink.table.planner.runtime.utils._
import org.apache.flink.types.Row
import org.junit.Assert.assertEquals
-import org.junit.Test
+import org.junit.{Ignore, Test}
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
-import java.time.{Duration, ZoneId}
+import java.lang.{Long => JLong}
+import java.time.{Duration, LocalDateTime, ZoneId, ZoneOffset}
import java.util
import java.util.concurrent.TimeUnit
-import org.apache.flink.table.planner.factories.TestValuesTableFactory
-
import scala.collection.JavaConversions._
@RunWith(classOf[Parameterized])
@@ -370,6 +371,141 @@ class GroupWindowITCase(mode: StateBackendMode, useTimestampLtz: Boolean)
assertEquals(expected.sorted.mkString("\n"), sink.getUpsertResults.sorted.mkString("\n"))
}
+ @Test
+ def testWindowAggregateOnUpsertSource(): Unit = {
+
+ def localDateTime(epochSecond: Long): LocalDateTime = {
+ LocalDateTime.ofEpochSecond(epochSecond, 0, ZoneOffset.UTC)
+ }
+
+ val upsertSourceCurrencyData = List(
+ changelogRow("+U", "Euro", "no1", JLong.valueOf(114L), localDateTime(1L)),
+ changelogRow("+U", "US Dollar", "no1", JLong.valueOf(102L), localDateTime(2L)),
+ changelogRow("+U", "Yen", "no1", JLong.valueOf(1L), localDateTime(3L)),
+ changelogRow("+U", "RMB", "no1", JLong.valueOf(702L), localDateTime(4L)),
+ changelogRow("+U", "Euro", "no1", JLong.valueOf(118L), localDateTime(6L)),
+ changelogRow("+U", "US Dollar", "no1", JLong.valueOf(104L), localDateTime(4L)),
+ changelogRow("-D", "RMB", "no1", JLong.valueOf(702L), localDateTime(4L)))
+
+ val upsertSourceDataId = registerData(upsertSourceCurrencyData)
+ tEnv.executeSql(
+ s"""
+ |CREATE TABLE upsert_currency (
+ | currency STRING,
+ | currency_no STRING,
+ | rate BIGINT,
+ | currency_time TIMESTAMP(3),
+ | WATERMARK FOR currency_time AS currency_time - interval '5' SECOND,
+ | PRIMARY KEY(currency) NOT ENFORCED
+ |) WITH (
+ | 'connector' = 'values',
+ | 'changelog-mode' = 'UA,D',
+ | 'data-id' = '$upsertSourceDataId'
+ |)
+ |""".stripMargin)
+ val sql =
+ """
+ |SELECT
+ |currency,
+ |COUNT(1) AS cnt,
+ |TUMBLE_START(currency_time, INTERVAL '5' SECOND) as w_start,
+ |TUMBLE_END(currency_time, INTERVAL '5' SECOND) as w_end
+ |FROM upsert_currency
+ |GROUP BY currency, TUMBLE(currency_time, INTERVAL '5' SECOND)
+ |""".stripMargin
+ val sink = new TestingAppendSink
+ tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink)
+ env.execute()
+ val expected = Seq(
+ "Euro,0,1970-01-01T00:00,1970-01-01T00:00:05",
+ "US Dollar,1,1970-01-01T00:00,1970-01-01T00:00:05",
+ "Yen,1,1970-01-01T00:00,1970-01-01T00:00:05",
+ "RMB,0,1970-01-01T00:00,1970-01-01T00:00:05",
+ "Euro,1,1970-01-01T00:00:05,1970-01-01T00:00:10")
+ assertEquals(expected.sorted, sink.getAppendResults.sorted)
+ }
+
+ @Ignore("FLINK-22680")
+ def testUnResolvedWindowAggregateOnUpsertSource(): Unit = {
+
+ def localDateTime(epochSecond: Long): LocalDateTime = {
+ LocalDateTime.ofEpochSecond(epochSecond, 0, ZoneOffset.UTC)
+ }
+
+ val upsertSourceCurrencyData = List(
+ changelogRow("+U", "Euro", "no1", JLong.valueOf(114L), localDateTime(1L)),
+ changelogRow("+U", "US Dollar", "no1", JLong.valueOf(102L), localDateTime(2L)),
+ changelogRow("+U", "Yen", "no1", JLong.valueOf(1L), localDateTime(3L)),
+ changelogRow("+U", "RMB", "no1", JLong.valueOf(702L), localDateTime(4L)),
+ changelogRow("+U", "Euro", "no1", JLong.valueOf(118L), localDateTime(6L)),
+ changelogRow("+U", "US Dollar", "no1", JLong.valueOf(104L), localDateTime(4L)),
+ changelogRow("-D", "RMB", "no1", JLong.valueOf(702L), localDateTime(4L)))
+
+ val upsertSourceDataId = registerData(upsertSourceCurrencyData)
+ tEnv.executeSql(
+ s"""
+ |CREATE TABLE upsert_currency (
+ | currency STRING,
+ | currency_no STRING,
+ | rate BIGINT,
+ | currency_time TIMESTAMP(3),
+ | WATERMARK FOR currency_time AS currency_time - interval '5' SECOND,
+ | PRIMARY KEY(currency) NOT ENFORCED
+ |) WITH (
+ | 'connector' = 'values',
+ | 'changelog-mode' = 'UA,D',
+ | 'data-id' = '$upsertSourceDataId'
+ |)
+ |""".stripMargin)
+ val sql =
+ """
+ |SELECT
+ |TUMBLE_START(currency_time, INTERVAL '5' SECOND) as w_start,
+ |TUMBLE_END(currency_time, INTERVAL '5' SECOND) as w_end,
+ |MAX(rate) AS max_rate
+ |FROM upsert_currency
+ |GROUP BY TUMBLE(currency_time, INTERVAL '5' SECOND)
+ |""".stripMargin
+ val sink = new TestingAppendSink
+ tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink)
+ env.execute()
+ }
+
+ @Test
+ def testWindowAggregateOnRetractStream(): Unit = {
+ val sql =
+ """
+ |SELECT
+ |`string`,
+ |TUMBLE_START(rowtime, INTERVAL '0.005' SECOND) as w_start,
+ |TUMBLE_END(rowtime, INTERVAL '0.005' SECOND) as w_end,
+ |COUNT(1) AS cnt
+ |FROM
+ | (
+ | SELECT `string`, rowtime
+ | FROM (
+ | SELECT *,
+ | ROW_NUMBER() OVER (PARTITION BY `string` ORDER BY rowtime DESC) as rowNum
+ | FROM testTable
+ | )
+ | WHERE rowNum = 1
+ |)
+ |GROUP BY `string`, TUMBLE(rowtime, INTERVAL '0.005' SECOND)
+ |""".stripMargin
+ val sink = new TestingAppendSink
+ tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink)
+ env.execute()
+ val expected = Seq(
+ "Hi,1970-01-01T00:00,1970-01-01T00:00:00.005,1",
+ "Hallo,1970-01-01T00:00,1970-01-01T00:00:00.005,1",
+ "Hello,1970-01-01T00:00,1970-01-01T00:00:00.005,0",
+ "Hello,1970-01-01T00:00:00.005,1970-01-01T00:00:00.010,1",
+ "Hello world,1970-01-01T00:00:00.005,1970-01-01T00:00:00.010,0",
+ "Hello world,1970-01-01T00:00:00.015,1970-01-01T00:00:00.020,1",
+ "null,1970-01-01T00:00:00.030,1970-01-01T00:00:00.035,1")
+ assertEquals(expected.sorted, sink.getAppendResults.sorted)
+ }
+
@Test
def testDistinctAggWithMergeOnEventTimeSessionGroupWindow(): Unit = {
if (useTimestampLtz) {