From 2c98f3cd70a07372ae06ff94443ac8515730f639 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 22 Dec 2023 14:41:57 +0800 Subject: [PATCH] V1Write should not add Sort when not needed --- .../sql/execution/datasources/V1Writes.scala | 4 +- .../datasources/V1WriteCommandSuite.scala | 54 +++++++------------ 2 files changed, 20 insertions(+), 38 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala index b1d2588ede627..d7a8d7aec0b7b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala @@ -102,8 +102,8 @@ object V1Writes extends Rule[LogicalPlan] with SQLConfHelper { val requiredOrdering = write.requiredOrdering.map(_.transform { case a: Attribute => attrMap.getOrElse(a, a) }.asInstanceOf[SortOrder]) - val outputOrdering = query.outputOrdering - val orderingMatched = isOrderingMatched(requiredOrdering, outputOrdering) + val outputOrdering = empty2NullPlan.outputOrdering + val orderingMatched = isOrderingMatched(requiredOrdering.map(_.child), outputOrdering) if (orderingMatched) { empty2NullPlan } else { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala index 3ca516463d367..ce43edb79c127 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala @@ -223,24 +223,15 @@ class V1WriteCommandSuite extends QueryTest with SharedSparkSession with V1Write } // assert the outer most sort in the executed plan - val sort = plan.collectFirst { case s: SortExec => s } - if (enabled) { - // With planned write, optimizer is more efficient and can eliminate the `SORT BY`. - assert(sort.exists { - case SortExec(Seq( - SortOrder(AttributeReference("key", IntegerType, _, _), Ascending, NullsFirst, _) - ), false, _, _) => true - case _ => false - }, plan) - } else { - assert(sort.exists { - case SortExec(Seq( - SortOrder(AttributeReference("key", IntegerType, _, _), Ascending, NullsFirst, _), - SortOrder(AttributeReference("value", StringType, _, _), Ascending, NullsFirst, _) - ), false, _, _) => true - case _ => false - }, plan) - } + assert(plan.collectFirst { + case s: SortExec => s + }.exists { + case SortExec(Seq( + SortOrder(AttributeReference("key", IntegerType, _, _), Ascending, NullsFirst, _), + SortOrder(AttributeReference("value", StringType, _, _), Ascending, NullsFirst, _) + ), false, _, _) => true + case _ => false + }, plan) } } } @@ -279,24 +270,15 @@ class V1WriteCommandSuite extends QueryTest with SharedSparkSession with V1Write } // assert the outer most sort in the executed plan - val sort = plan.collectFirst { case s: SortExec => s } - if (enabled) { - // With planned write, optimizer is more efficient and can eliminate the `SORT BY`. - assert(sort.exists { - case SortExec(Seq( - SortOrder(AttributeReference("value", StringType, _, _), Ascending, NullsFirst, _) - ), false, _, _) => true - case _ => false - }, plan) - } else { - assert(sort.exists { - case SortExec(Seq( - SortOrder(AttributeReference("value", StringType, _, _), Ascending, NullsFirst, _), - SortOrder(AttributeReference("key", IntegerType, _, _), Ascending, NullsFirst, _) - ), false, _, _) => true - case _ => false - }, plan) - } + assert(plan.collectFirst { + case s: SortExec => s + }.exists { + case SortExec(Seq( + SortOrder(AttributeReference("value", StringType, _, _), Ascending, NullsFirst, _), + SortOrder(AttributeReference("key", IntegerType, _, _), Ascending, NullsFirst, _) + ), false, _, _) => true + case _ => false + }, plan) } } }