Skip to content

Commit 43597eb

Browse files
wangyumGitHub Enterprise
authored andcommitted
[CARMEL-5933] Runtime Filter supports pruning side has window (#909)
* Runtime Filter supports pruning side has window * Update DynamicDataPruningSuite.scala
1 parent 8c2a7dd commit 43597eb

File tree

2 files changed

+79
-9
lines changed

2 files changed

+79
-9
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -190,19 +190,33 @@ object PartitionPruning extends Rule[LogicalPlan]
190190
}
191191
}
192192

193+
// Make sure injected filters could push through Shuffle, see PushPredicateThroughNonJoin
194+
private def probablyPushThroughShuffle(exp: Expression, plan: LogicalPlan): Boolean = {
195+
plan match {
196+
case j: Join if !canPlanAsBroadcastHashJoin(j, conf) => true
197+
case a @ Aggregate(groupingExps, aggExps, child)
198+
if aggExps.forall(_.deterministic) && groupingExps.nonEmpty &&
199+
replaceAlias(exp, getAliasMap(a)).references.subsetOf(child.outputSet) => true
200+
case w: Window
201+
if w.partitionSpec.forall(_.isInstanceOf[AttributeReference]) &&
202+
exp.references.subsetOf(AttributeSet(w.partitionSpec.flatMap(_.references))) => true
203+
case p: Project =>
204+
probablyPushThroughShuffle(replaceAlias(exp, getAliasMap(p)), p.child)
205+
case other =>
206+
other.children.exists { p =>
207+
if (exp.references.subsetOf(p.outputSet)) probablyPushThroughShuffle(exp, p) else false
208+
}
209+
}
210+
}
211+
193212
private def dataPruningHasBenefit(
194213
prunRelation: LogicalRelation,
214+
exp: Expression,
195215
prunPlan: LogicalPlan,
196216
otherPlan: LogicalPlan,
197217
canBuildBroadcast: Boolean): Boolean = {
198218
if (canBuildBroadcast) {
199-
val shuffleStages = prunPlan.collect {
200-
case j @ Join(left, right, _, _, hint)
201-
if !canBroadcastBySize(left, SQLConf.get) && !canBroadcastBySize(right, SQLConf.get)
202-
&& !hintToBroadcastLeft(hint) && !hintToBroadcastRight(hint) => j
203-
case a: Aggregate => a
204-
}
205-
shuffleStages.exists(_.collectLeaves().exists(_.equals(prunRelation))) &&
219+
probablyPushThroughShuffle(exp, prunPlan) &&
206220
prunRelation.stats.sizeInBytes >= SQLConf.get.dynamicDataPruningSideThreshold
207221
} else {
208222
val estimatePruningSideSize =
@@ -251,7 +265,7 @@ object PartitionPruning extends Rule[LogicalPlan]
251265
canPruneLeft(joinType) &&
252266
supportDynamicPruning(right) &&
253267
(canBroadcastBySize(right, conf) || hintToBroadcastRight(hint)) &&
254-
dataPruningHasBenefit(scan.logicalRelation, left, right,
268+
dataPruningHasBenefit(scan.logicalRelation, l, left, right,
255269
canBuildBroadcastRight(joinType)) =>
256270
newLeft = insertDataPredicate(l, newLeft, r, right, rightKeys)
257271
case _ =>
@@ -269,7 +283,7 @@ object PartitionPruning extends Rule[LogicalPlan]
269283
canPruneRight(joinType) &&
270284
supportDynamicPruning(left) &&
271285
(canBroadcastBySize(left, conf) || hintToBroadcastLeft(hint)) &&
272-
dataPruningHasBenefit(scan.logicalRelation, right, left,
286+
dataPruningHasBenefit(scan.logicalRelation, r, right, left,
273287
canBuildBroadcastLeft(joinType)) =>
274288
newRight = insertDataPredicate(r, newRight, l, left, leftKeys)
275289
case _ =>

sql/core/src/test/scala/org/apache/spark/sql/DynamicDataPruningSuite.scala

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -624,6 +624,62 @@ abstract class DynamicDataPruningSuiteBase
624624
}
625625
}
626626

627+
test("Aggregate should triggers shuffle pruning") {
628+
withSQLConf(SQLConf.DYNAMIC_DATA_PRUNING_ENABLED.key -> "true",
629+
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "3K",
630+
SQLConf.DYNAMIC_DATA_PRUNING_SIDE_THRESHOLD.key -> "10K") {
631+
val df = sql(
632+
"""
633+
|SELECT t11.a,
634+
| t11.b
635+
|FROM (SELECT DISTINCT * FROM t1) t11
636+
| JOIN t3
637+
| ON t11.a = t3.a AND t3.b < 2
638+
|""".stripMargin)
639+
640+
checkDataPruningPredicate(df, false, true)
641+
checkAnswer(df, Row(0, 0) :: Row(1, 1) :: Nil)
642+
}
643+
}
644+
645+
test("Window should triggers shuffle pruning") {
646+
withSQLConf(SQLConf.DYNAMIC_DATA_PRUNING_ENABLED.key -> "true",
647+
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "3K",
648+
SQLConf.DYNAMIC_DATA_PRUNING_SIDE_THRESHOLD.key -> "10K") {
649+
val df = sql(
650+
"""
651+
|SELECT t11.a,
652+
| t11.b,
653+
| t11.rn
654+
|FROM (SELECT *, Row_number() OVER (PARTITION BY a ORDER BY b) rn FROM t1) t11
655+
| JOIN t3
656+
| ON t11.a = t3.a AND t3.b < 2
657+
|""".stripMargin)
658+
659+
checkDataPruningPredicate(df, false, true)
660+
checkAnswer(df, Row(0, 0, 1) :: Row(1, 1, 1) :: Nil)
661+
}
662+
}
663+
664+
test("Window should not triggers shuffle pruning if can't push through Shuffle") {
665+
withSQLConf(SQLConf.DYNAMIC_DATA_PRUNING_ENABLED.key -> "true",
666+
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "3K",
667+
SQLConf.DYNAMIC_DATA_PRUNING_SIDE_THRESHOLD.key -> "10K") {
668+
val df = sql(
669+
"""
670+
|SELECT t11.a,
671+
| t11.b,
672+
| t11.rn
673+
|FROM (SELECT *, Row_number() OVER (PARTITION BY b ORDER BY a) rn FROM t1) t11
674+
| JOIN t3
675+
| ON t11.a = t3.a AND t3.b < 2
676+
|""".stripMargin)
677+
678+
checkDataPruningPredicate(df, false, false)
679+
checkAnswer(df, Row(0, 0, 1) :: Row(1, 1, 1) :: Nil)
680+
}
681+
}
682+
627683
test("CARMEL-5442: Fall back to true if InSet size exceed DYNAMIC_PRUNING_MAX_INSET_NUM") {
628684
withSQLConf(SQLConf.DYNAMIC_DATA_PRUNING_ENABLED.key -> "true",
629685
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "3K",

0 commit comments

Comments
 (0)