From 8bd70c9122d5f0a080c53b7d62dd5ab3dbf34d2e Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Tue, 22 Mar 2022 18:40:13 +0800 Subject: [PATCH 1/4] Avoid unnecessary sort in FileFormatWriter if user has specified sort --- .../sql/execution/adaptive/AQEUtils.scala | 21 +++++++++++++++ .../adaptive/AdaptiveSparkPlanExec.scala | 15 ++++++++--- .../exchange/EnsureRequirements.scala | 15 +++++++++-- .../exchange/ValidateRequirements.scala | 8 ++++++ .../adaptive/AdaptiveQueryExecSuite.scala | 26 +++++++++++++++---- 5 files changed, 75 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEUtils.scala index 51833012a128e..fbb06209b5c78 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEUtils.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.adaptive +import org.apache.spark.sql.catalyst.expressions.SortOrder import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Distribution, HashPartitioning, UnspecifiedDistribution} import org.apache.spark.sql.execution.{CollectMetricsExec, FilterExec, ProjectExec, SortExec, SparkPlan} import org.apache.spark.sql.execution.exchange.{REPARTITION_BY_COL, REPARTITION_BY_NUM, ShuffleExchangeExec} @@ -57,4 +58,24 @@ object AQEUtils { } case _ => Some(UnspecifiedDistribution) } + + // Analyze the given plan and calculate the required ordering of this plan w.r.t. the + // user-specified sort. + def getRequiredOrdering(p: SparkPlan): Seq[SortOrder] = p match { + case f: FilterExec => getRequiredOrdering(f.child) + case c: CollectMetricsExec => getRequiredOrdering(c.child) + // We do not need to care whether the sort is global or not, since the output partitioning + // is ensured by requiredDistribution. + case s: SortExec => s.outputOrdering + case p: ProjectExec => + val requiredOrdering = getRequiredOrdering(p.child) + // avoid case `df.sort(a, b).select(c)` + if (requiredOrdering.map(_.child).forall(e => p.projectList.exists(_.semanticEquals(e)))) { + requiredOrdering + } else { + Nil + } + + case _ => Nil + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index c6505a0ea5f73..9bea29f728a44 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -30,7 +30,7 @@ import org.apache.spark.broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer} import org.apache.spark.sql.catalyst.plans.physical.{Distribution, UnspecifiedDistribution} import org.apache.spark.sql.catalyst.rules.{PlanChangeLogger, Rule} @@ -97,6 +97,13 @@ case class AdaptiveSparkPlanExec( AQEUtils.getRequiredDistribution(inputPlan) } + // Make sure AQE does not change the user-specified output ordering + @transient private val requiredOrdering: Seq[SortOrder] = if (isSubquery) { + Nil + } else { + AQEUtils.getRequiredOrdering(inputPlan) + } + @transient private val costEvaluator = conf.getConf(SQLConf.ADAPTIVE_CUSTOM_COST_EVALUATOR_CLASS) match { case Some(className) => CostEvaluator.instantiate(className, session.sparkContext.getConf) @@ -112,7 +119,7 @@ case class AdaptiveSparkPlanExec( // `EnsureRequirements` to not optimize out the user-specified repartition-by-col to work // around this case. val ensureRequirements = - EnsureRequirements(requiredDistribution.isDefined, requiredDistribution) + EnsureRequirements(requiredDistribution.isDefined, requiredDistribution, requiredOrdering) Seq( RemoveRedundantProjects, ensureRequirements, @@ -163,7 +170,7 @@ case class AdaptiveSparkPlanExec( } else { UnspecifiedDistribution } - if (ValidateRequirements.validate(applied, distribution)) { + if (ValidateRequirements.validate(applied, distribution, requiredOrdering)) { applied } else { logDebug(s"Rule ${rule.ruleName} is not applied as it breaks the " + @@ -207,6 +214,8 @@ case class AdaptiveSparkPlanExec( override def output: Seq[Attribute] = inputPlan.output + override def outputOrdering: Seq[SortOrder] = requiredOrdering + override def doCanonicalize(): SparkPlan = inputPlan.canonicalized override def resetMetrics(): Unit = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala index de1806ab87b4c..b9837311baedc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala @@ -40,10 +40,13 @@ import org.apache.spark.sql.execution.joins.{ShuffledHashJoinExec, SortMergeJoin * repartition shuffles in the plan. * @param requiredDistribution The root required distribution we should ensure. This value is used * in AQE in case we change final stage output partitioning. + * @param requiredOrdering The root required ordering we should ensure. This value is used + * in AQE in case we change final stage output ordering. */ case class EnsureRequirements( optimizeOutRepartition: Boolean = true, - requiredDistribution: Option[Distribution] = None) + requiredDistribution: Option[Distribution] = None, + requiredOrdering: Seq[SortOrder] = Nil) extends Rule[SparkPlan] { private def ensureDistributionAndOrdering( @@ -333,10 +336,18 @@ case class EnsureRequirements( val finalPlan = ensureDistributionAndOrdering( newPlan :: Nil, requiredDistribution.get :: Nil, - Seq(Nil), + Seq(requiredOrdering), shuffleOrigin) assert(finalPlan.size == 1) finalPlan.head + } else if (requiredOrdering.nonEmpty) { + val finalPlan = ensureDistributionAndOrdering( + newPlan :: Nil, + Nil, + Seq(requiredOrdering), + ENSURE_REQUIREMENTS) + assert(finalPlan.size == 1) + finalPlan.head } else { newPlan } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ValidateRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ValidateRequirements.scala index 1ac6b809fd250..b431608db3f92 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ValidateRequirements.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ValidateRequirements.scala @@ -30,6 +30,14 @@ import org.apache.spark.sql.execution._ */ object ValidateRequirements extends Logging { + def validate( + plan: SparkPlan, + requiredDistribution: Distribution, + requiredOrdering: Seq[SortOrder]): Boolean = { + validate(plan, requiredDistribution) && + SortOrder.orderingSatisfies(plan.outputOrdering, requiredOrdering) + } + def validate(plan: SparkPlan, requiredDistribution: Distribution): Boolean = { validate(plan) && plan.outputPartitioning.satisfies(requiredDistribution) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 76741dc4d08e0..041a9bf31bfec 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -1468,8 +1468,8 @@ class AdaptiveQueryExecSuite val (plan2, adaptivePlan2) = runAdaptiveAndVerifyResult( "SELECT key FROM (SELECT * FROM testData WHERE value = 'no_match' ORDER BY key)" + " WHERE key > rand()") - assert(findTopLevelSort(plan2).size == 1) - assert(stripAQEPlan(adaptivePlan2).isInstanceOf[LocalTableScanExec]) + assert(findTopLevelSort(plan2).head.global) + assert(!findTopLevelSort(adaptivePlan2).head.global) } } @@ -2535,13 +2535,13 @@ class AdaptiveQueryExecSuite (1 to 4).map(i => TestData(i, i.toString)), 2) .toDF("c1", "c2").createOrReplaceTempView("v") - // remove sort + // remove global sort val (origin1, adaptive1) = runAdaptiveAndVerifyResult( """ |SELECT * FROM v where c1 = 1 order by c1, c2 |""".stripMargin) - assert(findTopLevelSort(origin1).size == 1) - assert(findTopLevelSort(adaptive1).isEmpty) + assert(findTopLevelSort(origin1).head.global) + assert(!findTopLevelSort(adaptive1).head.global) // convert group only aggregate to project val (origin2, adaptive2) = runAdaptiveAndVerifyResult( @@ -2575,6 +2575,22 @@ class AdaptiveQueryExecSuite assert(findTopLevelAggregate(adaptive5).size == 4) } } + + test("SPARK-38578: Avoid unnecessary sort in FileFormatWriter if user has specified sort") { + Seq( + ("key", "key, value", false), + ("key as x", "key, value", false), + ("key", "key", true), + ("key, value", "key", true) + ).foreach { case (project, sort, required) => + val (origin, adaptive) = runAdaptiveAndVerifyResult( + s""" + |SELECT $project FROM testdata where key < 0 ORDER BY $sort + |""".stripMargin) + assert(findTopLevelSort(origin).size == 1) + assert(findTopLevelSort(adaptive).nonEmpty == required) + } + } } /** From ef1e713bd3e823f30cfbc5f2f4932e5dab029cac Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Tue, 22 Mar 2022 22:14:20 +0800 Subject: [PATCH 2/4] code comments --- .../org/apache/spark/sql/execution/adaptive/AQEUtils.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEUtils.scala index fbb06209b5c78..9f71d3e4889eb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEUtils.scala @@ -62,6 +62,8 @@ object AQEUtils { // Analyze the given plan and calculate the required ordering of this plan w.r.t. the // user-specified sort. def getRequiredOrdering(p: SparkPlan): Seq[SortOrder] = p match { + // User-specified repartition is only effective when it's the root node, or under + // Project/Filter/CollectMetrics. case f: FilterExec => getRequiredOrdering(f.child) case c: CollectMetricsExec => getRequiredOrdering(c.child) // We do not need to care whether the sort is global or not, since the output partitioning From 3859727f346aed71acb7e5b4c2f65f3069f56f2b Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Wed, 23 Mar 2022 09:49:29 +0800 Subject: [PATCH 3/4] test name --- .../spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 041a9bf31bfec..87c72650637f0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -2576,7 +2576,7 @@ class AdaptiveQueryExecSuite } } - test("SPARK-38578: Avoid unnecessary sort in FileFormatWriter if user has specified sort") { + test("SPARK-38578: AdaptiveSparkPlanExec should ensure user-specified ordering") { Seq( ("key", "key, value", false), ("key as x", "key, value", false), From 602c33caec91fbae3d9fe2bf95d055130e33100b Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Wed, 23 Mar 2022 12:13:09 +0800 Subject: [PATCH 4/4] address comment --- .../org/apache/spark/sql/execution/adaptive/AQEUtils.scala | 2 +- .../spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEUtils.scala index 9f71d3e4889eb..949e048c5286c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEUtils.scala @@ -62,7 +62,7 @@ object AQEUtils { // Analyze the given plan and calculate the required ordering of this plan w.r.t. the // user-specified sort. def getRequiredOrdering(p: SparkPlan): Seq[SortOrder] = p match { - // User-specified repartition is only effective when it's the root node, or under + // User-specified sort is only effective when it's the root node, or under // Project/Filter/CollectMetrics. case f: FilterExec => getRequiredOrdering(f.child) case c: CollectMetricsExec => getRequiredOrdering(c.child) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 87c72650637f0..006ec78d67b26 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -2583,6 +2583,8 @@ class AdaptiveQueryExecSuite ("key", "key", true), ("key, value", "key", true) ).foreach { case (project, sort, required) => + // During re-optimize in AQE, the sort will be converted to local relation if it's empty + // So this test ensure we will add sort back if it is User-specified val (origin, adaptive) = runAdaptiveAndVerifyResult( s""" |SELECT $project FROM testdata where key < 0 ORDER BY $sort