Skip to content

Commit 34780ad

Browse files
wangyumGitHub Enterprise
authored andcommitted
[CARMEL-6077] Use TakeOrderedAndProject if maxRows below the topKSortMaxRowsThreshold (#1015)
* Use TakeOrderedAndProject if maxRows below the topKSortMaxRowsThreshold * Fix
1 parent de7f557 commit 34780ad

File tree

4 files changed

+59
-1
lines changed

4 files changed

+59
-1
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2151,6 +2151,17 @@ object SQLConf {
21512151
.intConf
21522152
.createWithDefault(ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH)
21532153

2154+
val TOP_K_SORT_MAX_ROWS_THRESHOLD = buildConf("spark.sql.execution.topKSortMaxRowsThreshold")
2155+
.doc("In SQL queries with a SORT and max rows exists like " +
2156+
"'SELECT DISTINCT x FROM t ORDER BY y', if max rows is under this threshold, do a top-K " +
2157+
"sort in memory to avoid doing range repartition.")
2158+
.version("3.4.0")
2159+
.intConf
2160+
.checkValue(_ <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH,
2161+
"The top-K sort max rows threshold should be less than or equal to " +
2162+
s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.")
2163+
.createWithDefault(655360)
2164+
21542165
val SORT_MERGE_JOIN_EXEC_BUFFER_SPILL_THRESHOLD =
21552166
buildConf("spark.sql.sortMergeJoinExec.buffer.spill.threshold")
21562167
.internal()

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,8 +100,23 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
100100
case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, true, child)))
101101
if limit < conf.topKSortFallbackThreshold =>
102102
TakeOrderedAndProjectExec(limit, order, projectList, planLater(child)) :: Nil
103+
104+
case Sort(order, true, child) if supportTakeOrdered(child) =>
105+
val limit = child.maxRows.get.toInt
106+
TakeOrderedAndProjectExec(limit, order, child.output, planLater(child)) :: Nil
107+
case Project(projectList, Sort(order, true, child)) if supportTakeOrdered(child) =>
108+
val limit = child.maxRows.get.toInt
109+
TakeOrderedAndProjectExec(limit, order, projectList, planLater(child)) :: Nil
103110
case _ => Nil
104111
}
112+
113+
private def supportTakeOrdered(plan: LogicalPlan): Boolean = {
114+
plan.maxRows.exists(_ < conf.getConf(SQLConf.TOP_K_SORT_MAX_ROWS_THRESHOLD)) &&
115+
// The plan should not contain global sort, to avoid sorting it again.
116+
plan.collectFirst {
117+
case s: Sort => s.global
118+
}.isEmpty
119+
}
105120
}
106121

107122
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LogicalQueryStage.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,4 +53,6 @@ case class LogicalQueryStage(
5353
}
5454
physicalStats.getOrElse(logicalPlan.stats)
5555
}
56+
57+
override def maxRows: Option[Long] = stats.rowCount.map(_.min(Long.MaxValue).toLong)
5658
}

sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.expressions._
2424
import org.apache.spark.sql.catalyst.plans._
2525
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Range, Repartition, Sort, Union}
2626
import org.apache.spark.sql.catalyst.plans.physical._
27-
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, DisableAdaptiveExecution}
27+
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveSparkPlanHelper, DisableAdaptiveExecution}
2828
import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec}
2929
import org.apache.spark.sql.execution.columnar.{InMemoryRelation, InMemoryTableScanExec}
3030
import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReusedExchangeExec, ShuffleExchangeExec}
@@ -977,6 +977,36 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper {
977977
assert(PartitioningUtil.extractPartitioning[CoalescedPartitioning](pc3) ==
978978
c :: c :: c :: Nil)
979979
}
980+
981+
test("SPARK-39698: Use TakeOrderedAndProject if maxRows below the topKSortFallbackThreshold") {
982+
Seq(-1, 10).foreach { threshold =>
983+
withSQLConf(SQLConf.TOP_K_SORT_MAX_ROWS_THRESHOLD.key -> threshold.toString,
984+
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
985+
val df = Seq(1 -> "a", 2 -> "b", 3 -> "c", 4 -> "d", 5 -> "e").toDF("i", "j").orderBy($"i")
986+
df.collect()
987+
val aeqPlan = df.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec]
988+
if (threshold < 0) {
989+
assert(aeqPlan.currentPhysicalPlan.isInstanceOf[execution.WholeStageCodegenExec])
990+
} else {
991+
assert(aeqPlan.currentPhysicalPlan.isInstanceOf[TakeOrderedAndProjectExec])
992+
}
993+
}
994+
}
995+
}
996+
997+
test("SPARK-39698: Do not use TakeOrderedAndProject if children contain global sort") {
998+
Seq(-1, 10).foreach { threshold =>
999+
withSQLConf(SQLConf.TOP_K_SORT_MAX_ROWS_THRESHOLD.key -> threshold.toString,
1000+
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
1001+
val df = Seq(1 -> "a", 2 -> "b", 3 -> "c", 4 -> "d", 5 -> "e").toDF("i", "j")
1002+
.orderBy($"i").limit(4).orderBy($"i")
1003+
df.collect()
1004+
val aeqPlan = df.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec]
1005+
// The last oder should be removed through RemoveRedundantSorts.
1006+
assert(aeqPlan.currentPhysicalPlan.isInstanceOf[TakeOrderedAndProjectExec])
1007+
}
1008+
}
1009+
}
9801010
}
9811011

9821012
// Used for unit-testing EnsureRequirements

0 commit comments

Comments
 (0)