From 36f21178850b4c2e1f08008a1d53243a319a4f5b Mon Sep 17 00:00:00 2001 From: Tartarus0zm Date: Tue, 6 Apr 2021 16:41:56 +0800 Subject: [PATCH 1/2] [FLINK-21923][table-planner-blink] Fix SplitAggregateRule not support both filter, count/sum, avg --- .../rules/logical/SplitAggregateRule.scala | 30 +++++++++++------- .../rules/logical/SplitAggregateRuleTest.xml | 31 +++++++++++++++++++ .../logical/SplitAggregateRuleTest.scala | 19 ++++++++++++ .../stream/sql/SplitAggregateITCase.scala | 23 ++++++++++++++ 4 files changed, 92 insertions(+), 11 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala index 8d8807b20d1df..f96c00f223535 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala @@ -27,7 +27,7 @@ import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery import org.apache.flink.table.planner.plan.nodes.FlinkRelNode import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalAggregate import org.apache.flink.table.planner.plan.utils.AggregateUtil.doAllAggSupportSplit -import org.apache.flink.table.planner.plan.utils.{ExpandUtil, WindowUtil} +import org.apache.flink.table.planner.plan.utils.{AggregateUtil, ExpandUtil, WindowUtil} import org.apache.calcite.plan.RelOptRule.{any, operand} import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} @@ -138,9 +138,11 @@ class SplitAggregateRule extends RelOptRule( val windowProps = fmq.getRelWindowProperties(agg.getInput) val isWindowAgg = WindowUtil.groupingContainsWindowStartEnd(agg.getGroupSet, windowProps) val isProctimeWindowAgg = isWindowAgg && !windowProps.isRowtime + // TableAggregate is not supported. see also FLINK-21923. + val isTableAgg = AggregateUtil.isTableAggregate(agg.getAggCallList) agg.partialFinalType == PartialFinalType.NONE && agg.containsDistinctCall() && - splitDistinctAggEnabled && isAllAggSplittable && !isProctimeWindowAgg + splitDistinctAggEnabled && isAllAggSplittable && !isProctimeWindowAgg && !isTableAgg } override def onMatch(call: RelOptRuleCall): Unit = { @@ -280,11 +282,15 @@ class SplitAggregateRule extends RelOptRule( } // STEP 2.3: construct partial aggregates - relBuilder.aggregate( - relBuilder.groupKey(fullGroupSet, ImmutableList.of[ImmutableBitSet](fullGroupSet)), + // Create aggregate node directly to avoid ClassCastException, + // Please see https://issues.apache.org/jira/browse/FLINK-21923 for more details. + val partialAggregate = FlinkLogicalAggregate.create( + relBuilder.build(), + fullGroupSet, + ImmutableList.of[ImmutableBitSet](fullGroupSet), newPartialAggCalls) - relBuilder.peek().asInstanceOf[FlinkLogicalAggregate] - .setPartialFinalType(PartialFinalType.PARTIAL) + partialAggregate.setPartialFinalType(PartialFinalType.PARTIAL) + relBuilder.push(partialAggregate) // STEP 3: construct final aggregates val finalAggInputOffset = fullGroupSet.cardinality @@ -306,13 +312,15 @@ class SplitAggregateRule extends RelOptRule( needMergeFinalAggOutput = true } } - relBuilder.aggregate( - relBuilder.groupKey( - SplitAggregateRule.remap(fullGroupSet, originalAggregate.getGroupSet), - SplitAggregateRule.remap(fullGroupSet, Seq(originalAggregate.getGroupSet))), + // Create aggregate node directly to avoid ClassCastException, + // Please see https://issues.apache.org/jira/browse/FLINK-21923 for more details. + val finalAggregate = FlinkLogicalAggregate.create( + relBuilder.build(), + SplitAggregateRule.remap(fullGroupSet, originalAggregate.getGroupSet), + SplitAggregateRule.remap(fullGroupSet, Seq(originalAggregate.getGroupSet)), finalAggCalls) - val finalAggregate = relBuilder.peek().asInstanceOf[FlinkLogicalAggregate] finalAggregate.setPartialFinalType(PartialFinalType.FINAL) + relBuilder.push(finalAggregate) // STEP 4: convert final aggregation output to the original aggregation output. // For example, aggregate function AVG is transformed to SUM0 and COUNT, so the output of diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.xml index 3895ee02ee624..efe5bc645238e 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.xml @@ -430,4 +430,35 @@ FlinkLogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], agg#1=[$SUM0($2)]) ]]> + + + + + + ($1, 2))], $f3=[IS TRUE(<>($1, 5))]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) +]]> + + + (b, 2)) AS $f2, IS TRUE(<>(b, 5)) AS $f3, MOD(HASH_CODE(b), 1024) AS $f4]) + +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) +]]> + + diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.scala index 4dbce13bc6405..d809dc47de1aa 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.scala @@ -186,4 +186,23 @@ class SplitAggregateRuleTest extends TableTestBase { |""".stripMargin util.verifyRelPlan(sqlQuery) } + + @Test + def testAggFilterClauseBothWithAvgAndCount(): Unit = { + util.tableEnv.getConfig.getConfiguration.setBoolean( + OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true) + val sqlQuery = + s""" + |SELECT + | a, + | COUNT(DISTINCT b) FILTER (WHERE NOT b = 2), + | SUM(b) FILTER (WHERE NOT b = 5), + | COUNT(b), + | AVG(b), + | SUM(b) + |FROM MyTable + |GROUP BY a + |""".stripMargin + util.verifyRelPlan(sqlQuery) + } } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala index d79931893c981..804c8321441cc 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala @@ -412,6 +412,29 @@ class SplitAggregateITCase( val expected = List("1,2,1,2,1", "2,4,3,4,3", "3,1,1,null,5", "4,2,2,6,5") assertEquals(expected.sorted, sink.getRetractResults.sorted) } + + @Test + def testAggFilterClauseBothWithAvgAndCount(): Unit = { + val t1 = tEnv.sqlQuery( + s""" + |SELECT + | a, + | COUNT(DISTINCT b) FILTER (WHERE NOT b = 2), + | SUM(b) FILTER (WHERE NOT b = 5), + | COUNT(b), + | SUM(b), + | AVG(b) + |FROM T + |GROUP BY a + """.stripMargin) + + val sink = new TestingRetractSink + t1.toRetractStream[Row].addSink(sink) + env.execute() + + val expected = List("1,1,3,2,3,1", "2,3,24,8,29,3", "3,1,null,2,10,5", "4,2,6,4,21,5") + assertEquals(expected.sorted, sink.getRetractResults.sorted) + } } object SplitAggregateITCase { From cfcfbee0f5d40d872158234d4a9f0d79ab038a5e Mon Sep 17 00:00:00 2001 From: Tartarus0zm Date: Sun, 25 Apr 2021 22:12:07 +0800 Subject: [PATCH 2/2] [FLINK-21923][table-planner-blink] add TODO reuse aggregate function for SplitAggregateRule --- .../table/planner/plan/rules/logical/SplitAggregateRule.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala index f96c00f223535..3c114f80d4bfb 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala @@ -284,6 +284,7 @@ class SplitAggregateRule extends RelOptRule( // STEP 2.3: construct partial aggregates // Create aggregate node directly to avoid ClassCastException, // Please see https://issues.apache.org/jira/browse/FLINK-21923 for more details. + // TODO reuse aggregate function, see FLINK-22412 val partialAggregate = FlinkLogicalAggregate.create( relBuilder.build(), fullGroupSet, @@ -314,6 +315,7 @@ class SplitAggregateRule extends RelOptRule( } // Create aggregate node directly to avoid ClassCastException, // Please see https://issues.apache.org/jira/browse/FLINK-21923 for more details. + // TODO reuse aggregate function, see FLINK-22412 val finalAggregate = FlinkLogicalAggregate.create( relBuilder.build(), SplitAggregateRule.remap(fullGroupSet, originalAggregate.getGroupSet),