Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
import org.apache.flink.table.planner.plan.nodes.FlinkRelNode
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalAggregate
import org.apache.flink.table.planner.plan.utils.AggregateUtil.doAllAggSupportSplit
import org.apache.flink.table.planner.plan.utils.{ExpandUtil, WindowUtil}
import org.apache.flink.table.planner.plan.utils.{AggregateUtil, ExpandUtil, WindowUtil}

import org.apache.calcite.plan.RelOptRule.{any, operand}
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
Expand Down Expand Up @@ -138,9 +138,11 @@ class SplitAggregateRule extends RelOptRule(
val windowProps = fmq.getRelWindowProperties(agg.getInput)
val isWindowAgg = WindowUtil.groupingContainsWindowStartEnd(agg.getGroupSet, windowProps)
val isProctimeWindowAgg = isWindowAgg && !windowProps.isRowtime
// TableAggregate is not supported. see also FLINK-21923.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not find any explanation why TableAggregate is not supported in FLINK-21923

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left a message here FLINK-21923.
If it is TableAggregate, ClassCastException will definitely occur

val isTableAgg = AggregateUtil.isTableAggregate(agg.getAggCallList)

agg.partialFinalType == PartialFinalType.NONE && agg.containsDistinctCall() &&
splitDistinctAggEnabled && isAllAggSplittable && !isProctimeWindowAgg
splitDistinctAggEnabled && isAllAggSplittable && !isProctimeWindowAgg && !isTableAgg
}

override def onMatch(call: RelOptRuleCall): Unit = {
Expand Down Expand Up @@ -280,11 +282,16 @@ class SplitAggregateRule extends RelOptRule(
}

// STEP 2.3: construct partial aggregates
relBuilder.aggregate(
relBuilder.groupKey(fullGroupSet, ImmutableList.of[ImmutableBitSet](fullGroupSet)),
// Create aggregate node directly to avoid ClassCastException,
// Please see https://issues.apache.org/jira/browse/FLINK-21923 for more details.
// TODO reuse aggregate function, see FLINK-22412
val partialAggregate = FlinkLogicalAggregate.create(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current approach does not consider the optimization of aggregate function reuse

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation of the previous version used calcite's function reuse optimization.
It seems that only avg will translate count and sum0. Discuss offline with @beyond1920, use the create method to be more direct, otherwise, we need to add a project to restore the columns.
Do you suggest to still use calcite, and then rebuild the project?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a quick fix, I suggest we keep the current approach and create a new issue to tract the optimization (please also add some comments here). you can take the optimization issue if you like

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your reminder! I has created a new issue FLINK-22412

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a TODO here (such as // TODO reuse aggregate function, see FLINK-22412)
and add some tests about window aggregate

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok,I will deal with it as soon as possible

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added TODO, please take a look again, thanks

relBuilder.build(),
fullGroupSet,
ImmutableList.of[ImmutableBitSet](fullGroupSet),
newPartialAggCalls)
relBuilder.peek().asInstanceOf[FlinkLogicalAggregate]
.setPartialFinalType(PartialFinalType.PARTIAL)
partialAggregate.setPartialFinalType(PartialFinalType.PARTIAL)
relBuilder.push(partialAggregate)

// STEP 3: construct final aggregates
val finalAggInputOffset = fullGroupSet.cardinality
Expand All @@ -306,13 +313,16 @@ class SplitAggregateRule extends RelOptRule(
needMergeFinalAggOutput = true
}
}
relBuilder.aggregate(
relBuilder.groupKey(
SplitAggregateRule.remap(fullGroupSet, originalAggregate.getGroupSet),
SplitAggregateRule.remap(fullGroupSet, Seq(originalAggregate.getGroupSet))),
// Create aggregate node directly to avoid ClassCastException,
// Please see https://issues.apache.org/jira/browse/FLINK-21923 for more details.
// TODO reuse aggregate function, see FLINK-22412
val finalAggregate = FlinkLogicalAggregate.create(
relBuilder.build(),
SplitAggregateRule.remap(fullGroupSet, originalAggregate.getGroupSet),
SplitAggregateRule.remap(fullGroupSet, Seq(originalAggregate.getGroupSet)),
finalAggCalls)
val finalAggregate = relBuilder.peek().asInstanceOf[FlinkLogicalAggregate]
finalAggregate.setPartialFinalType(PartialFinalType.FINAL)
relBuilder.push(finalAggregate)

// STEP 4: convert final aggregation output to the original aggregation output.
// For example, aggregate function AVG is transformed to SUM0 and COUNT, so the output of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,4 +430,35 @@ FlinkLogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], agg#1=[$SUM0($2)])
]]>
</Resource>
</TestCase>
<TestCase name="testAggFilterClauseBothWithAvgAndCount">
<Resource name="sql">
<![CDATA[
SELECT
a,
COUNT(DISTINCT b) FILTER (WHERE NOT b = 2),
SUM(b) FILTER (WHERE NOT b = 5),
COUNT(b),
AVG(b),
SUM(b)
FROM MyTable
GROUP BY a
]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1) FILTER $2], EXPR$2=[SUM($1) FILTER $3], EXPR$3=[COUNT($1)], EXPR$4=[AVG($1)], EXPR$5=[SUM($1)])
+- LogicalProject(a=[$0], b=[$1], $f2=[IS TRUE(<>($1, 2))], $f3=[IS TRUE(<>($1, 5))])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
FlinkLogicalCalc(select=[a, $f1, $f2, $f3, CAST(IF(=($f5, 0:BIGINT), null:INTEGER, /($f4, $f5))) AS $f4, $f6])
+- FlinkLogicalAggregate(group=[{0}], agg#0=[$SUM0($2)], agg#1=[SUM($3)], agg#2=[$SUM0($4)], agg#3=[$SUM0($5)], agg#4=[$SUM0($6)], agg#5=[SUM($7)])
+- FlinkLogicalAggregate(group=[{0, 4}], agg#0=[COUNT(DISTINCT $1) FILTER $2], agg#1=[SUM($1) FILTER $3], agg#2=[COUNT($1)], agg#3=[$SUM0($1)], agg#4=[COUNT($1)], agg#5=[SUM($1)])
+- FlinkLogicalCalc(select=[a, b, IS TRUE(<>(b, 2)) AS $f2, IS TRUE(<>(b, 5)) AS $f3, MOD(HASH_CODE(b), 1024) AS $f4])
+- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
</Root>
Original file line number Diff line number Diff line change
Expand Up @@ -186,4 +186,23 @@ class SplitAggregateRuleTest extends TableTestBase {
|""".stripMargin
util.verifyRelPlan(sqlQuery)
}

@Test
def testAggFilterClauseBothWithAvgAndCount(): Unit = {
util.tableEnv.getConfig.getConfiguration.setBoolean(
OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true)
val sqlQuery =
s"""
|SELECT
| a,
| COUNT(DISTINCT b) FILTER (WHERE NOT b = 2),
| SUM(b) FILTER (WHERE NOT b = 5),
| COUNT(b),
| AVG(b),
| SUM(b)
|FROM MyTable
|GROUP BY a
|""".stripMargin
util.verifyRelPlan(sqlQuery)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,29 @@ class SplitAggregateITCase(
val expected = List("1,2,1,2,1", "2,4,3,4,3", "3,1,1,null,5", "4,2,2,6,5")
assertEquals(expected.sorted, sink.getRetractResults.sorted)
}

@Test
def testAggFilterClauseBothWithAvgAndCount(): Unit = {
val t1 = tEnv.sqlQuery(
s"""
|SELECT
| a,
| COUNT(DISTINCT b) FILTER (WHERE NOT b = 2),
| SUM(b) FILTER (WHERE NOT b = 5),
| COUNT(b),
| SUM(b),
| AVG(b)
|FROM T
|GROUP BY a
""".stripMargin)

val sink = new TestingRetractSink
t1.toRetractStream[Row].addSink(sink)
env.execute()

val expected = List("1,1,3,2,3,1", "2,3,24,8,29,3", "3,1,null,2,10,5", "4,2,6,4,21,5")
assertEquals(expected.sorted, sink.getRetractResults.sorted)
}
}

object SplitAggregateITCase {
Expand Down