Skip to content

Commit a1ec6fb

Browse files
wangyumGitHub Enterprise
authored andcommitted
[CARMEL-6057] Fix limit push down through window (#1016)
* Fix limit push down through window * fix * fix * fix * flaky test
1 parent dbb8d88 commit a1ec6fb

File tree

3 files changed

+7
-101
lines changed

3 files changed

+7
-101
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushDownThroughWindow.scala

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ object LimitPushDownThroughWindow extends Rule[LogicalPlan] with AliasHelper {
6161
attr: Attribute, order: Seq[SortOrder], window: Window): Boolean = {
6262
window.windowExpressions.size == 1 &&
6363
getAliasMap(window.windowExpressions).get(attr).exists(a => pushableWindowExprs(a :: Nil)) &&
64-
(order.isEmpty || (window.partitionSpec.nonEmpty &&
64+
(order.isEmpty || (window.partitionSpec.nonEmpty && window.partitionSpec.size >= order.size &&
6565
window.partitionSpec.zip(order).forall {
6666
case (e: Expression, s: SortOrder) => s.child.semanticEquals(e)
6767
}))
@@ -76,7 +76,8 @@ object LimitPushDownThroughWindow extends Rule[LogicalPlan] with AliasHelper {
7676
private def pushedOrder(order: Seq[SortOrder], window: Window): Seq[SortOrder] = {
7777
val windowSize = window.partitionSpec.size
7878
order.take(windowSize) ++
79-
sortWindowSpec(window.partitionSpec.takeRight(windowSize - order.size), window.orderSpec)
79+
sortWindowSpec(window.partitionSpec.takeRight(windowSize - order.size), window.orderSpec) ++
80+
order.takeRight(order.size - windowSize).filter(_.references.subsetOf(window.child.outputSet))
8081
}
8182

8283
def apply(plan: LogicalPlan): LogicalPlan = plan.transform {
@@ -105,13 +106,6 @@ object LimitPushDownThroughWindow extends Rule[LogicalPlan] with AliasHelper {
105106
limit < conf.topKSortFallbackThreshold && supportsPushDown(attr, Nil, window) =>
106107
val newWindowChild = Limit(limitExpr, Sort(pushedOrder(Nil, window), true, window.child))
107108
f.copy(child = window.copy(child = newWindowChild))
108-
109-
case Limit(limitExpr @ IntegerLiteral(limit), s @ Sort(order, _,
110-
f @ Filter(ExtractFilterCondition(attr, filterVal), window: Window)))
111-
if limit < filterVal && window.maxRows.forall(_ > limit) &&
112-
limit < conf.topKSortFallbackThreshold && supportsPushDown(attr, order, window) =>
113-
val newWindowChild = Limit(limitExpr, Sort(pushedOrder(order, window), true, window.child))
114-
s.copy(child = f.copy(child = window.copy(child = newWindowChild)))
115109
}
116110

117111
private object ExtractFilterCondition {

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownThroughWindowSuite.scala

Lines changed: 3 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -228,8 +228,8 @@ class LimitPushdownThroughWindowSuite extends PlanTest {
228228
}
229229

230230
test("SPARK-39600: Limit should LessThan/LessThanOrEqual filter value") {
231-
Seq(3, 4, 5, 6).foreach { limitVal =>
232-
Seq(3, 4, 5, 6).foreach { filterVal =>
231+
Seq(40, 80, 120).foreach { limitVal =>
232+
Seq(40, 80, 120).foreach { filterVal =>
233233
val originalQuery = testRelation
234234
.select(a, b, c,
235235
windowExpr(RowNumber(), windowSpec(a :: Nil, c.desc :: Nil, windowFrame)).as("rn"))
@@ -242,35 +242,7 @@ class LimitPushdownThroughWindowSuite extends PlanTest {
242242
windowExpr(RowNumber(), windowSpec(a :: Nil, c.desc :: Nil, windowFrame)).as("rn"))
243243
.where('rn < Literal(filterVal))
244244

245-
if (limitVal < filterVal) {
246-
comparePlans(
247-
Optimize.execute(originalQuery.analyze),
248-
WithoutOptimize.execute(correctAnswer.analyze))
249-
} else {
250-
comparePlans(
251-
Optimize.execute(originalQuery.analyze),
252-
WithoutOptimize.execute(originalQuery.analyze))
253-
}
254-
}
255-
}
256-
257-
Seq(3, 4, 5, 6).foreach { limitVal =>
258-
Seq(3, 4, 5, 6).foreach { filterVal =>
259-
val originalQuery = testRelation
260-
.select(a, b, c,
261-
windowExpr(RowNumber(), windowSpec(a :: Nil, c.desc :: Nil, windowFrame)).as("rn"))
262-
.where('rn <= Literal(filterVal))
263-
.sortBy(a.asc, b.asc)
264-
.limit(limitVal)
265-
val correctAnswer = testRelation
266-
.orderBy(a.asc, c.desc)
267-
.limit(limitVal)
268-
.select(a, b, c,
269-
windowExpr(RowNumber(), windowSpec(a :: Nil, c.desc :: Nil, windowFrame)).as("rn"))
270-
.where('rn <= Literal(filterVal))
271-
.sortBy(a.asc, b.asc)
272-
273-
if (limitVal <= filterVal) {
245+
if (limitVal < filterVal && limitVal < 100) {
274246
comparePlans(
275247
Optimize.execute(originalQuery.analyze),
276248
WithoutOptimize.execute(correctAnswer.analyze))
@@ -283,26 +255,6 @@ class LimitPushdownThroughWindowSuite extends PlanTest {
283255
}
284256
}
285257

286-
test("SPARK-39600: Push down if partitionSpec is the prefix of sort exprs") {
287-
val originalQuery = testRelation
288-
.select(a, b, c,
289-
windowExpr(RowNumber(), windowSpec(b :: Nil, c.desc :: Nil, windowFrame)).as("rn"))
290-
.where('rn <= Literal(5))
291-
.orderBy(b.asc)
292-
.limit(5)
293-
val correctAnswer = testRelation
294-
.orderBy(b.asc, c.desc)
295-
.limit(5)
296-
.select(a, b, c,
297-
windowExpr(RowNumber(), windowSpec(b :: Nil, c.desc :: Nil, windowFrame)).as("rn"))
298-
.where('rn <= Literal(5))
299-
.orderBy(b.asc)
300-
301-
comparePlans(
302-
Optimize.execute(originalQuery.analyze),
303-
WithoutOptimize.execute(correctAnswer.analyze))
304-
}
305-
306258
test("SPARK-39600: Push down if sort exprs is not exist") {
307259
val originalQuery = testRelation
308260
.select(a, b, c,
@@ -321,46 +273,6 @@ class LimitPushdownThroughWindowSuite extends PlanTest {
321273
WithoutOptimize.execute(correctAnswer.analyze))
322274
}
323275

324-
test("SPARK-39600: Push down should keep the origin order") {
325-
val originalQuery = testRelation
326-
.select(a, b, c,
327-
windowExpr(RowNumber(), windowSpec(b :: Nil, c.desc :: Nil, windowFrame)).as("rn"))
328-
.where('rn <= Literal(5))
329-
.orderBy(b.desc, c.asc, a.desc)
330-
.limit(5)
331-
val correctAnswer = testRelation
332-
.orderBy(b.desc, c.desc)
333-
.limit(5)
334-
.select(a, b, c,
335-
windowExpr(RowNumber(), windowSpec(b :: Nil, c.desc :: Nil, windowFrame)).as("rn"))
336-
.where('rn <= Literal(5))
337-
.orderBy(b.desc, c.asc, a.desc)
338-
339-
comparePlans(
340-
Optimize.execute(originalQuery.analyze),
341-
WithoutOptimize.execute(correctAnswer.analyze))
342-
}
343-
344-
test("SPARK-39600: Order exprs is subset of partitionSpec") {
345-
val originalQuery = testRelation
346-
.select(a, b, c,
347-
windowExpr(RowNumber(), windowSpec(a :: b :: Nil, c.desc :: Nil, windowFrame)).as("rn"))
348-
.where('rn <= Literal(5))
349-
.orderBy(a.desc)
350-
.limit(5)
351-
val correctAnswer = testRelation
352-
.orderBy(a.desc, b.asc, c.desc)
353-
.limit(5)
354-
.select(a, b, c,
355-
windowExpr(RowNumber(), windowSpec(a :: b :: Nil, c.desc :: Nil, windowFrame)).as("rn"))
356-
.where('rn <= Literal(5))
357-
.orderBy(a.desc)
358-
359-
comparePlans(
360-
Optimize.execute(originalQuery.analyze),
361-
WithoutOptimize.execute(correctAnswer.analyze))
362-
}
363-
364276
test("SPARK-39600: Unsupported case: do not push down if partitionSpec is empty") {
365277
val originalQuery = testRelation
366278
.select(a, b, c,

sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -377,7 +377,7 @@ abstract class DisableUnnecessaryBucketedScanSuite extends QueryTest
377377
"""
378378
SELECT * FROM
379379
(SELECT j, row_number() OVER (partition by j order by i) rn, i
380-
FROM t1 order by j) t
380+
FROM t1 order by j, rn) t
381381
WHERE rn <= 3
382382
""".stripMargin, 0, 1),
383383

0 commit comments

Comments
 (0)