Skip to content

Commit ce3437a

Browse files
committed
Revert "[SPARK-53738][SQL] PlannedWrite should preserve custom sort order when query output contains literal"
This reverts commit 22d9709.
1 parent 2a5d03a commit ce3437a

File tree

16 files changed

+34
-179
lines changed

16 files changed

+34
-179
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,6 @@ package object dsl extends SQLConfHelper {
151151

152152
def asc: SortOrder = SortOrder(expr, Ascending)
153153
def asc_nullsLast: SortOrder = SortOrder(expr, Ascending, NullsLast, Seq.empty)
154-
def const: SortOrder = SortOrder(expr, Constant)
155154
def desc: SortOrder = SortOrder(expr, Descending)
156155
def desc_nullsFirst: SortOrder = SortOrder(expr, Descending, NullsFirst, Seq.empty)
157156
def as(alias: String): NamedExpression = Alias(expr, alias)()

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala

Lines changed: 11 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,6 @@ case object Descending extends SortDirection {
4545
override def defaultNullOrdering: NullOrdering = NullsLast
4646
}
4747

48-
case object Constant extends SortDirection {
49-
override def sql: String = "CONST"
50-
override def defaultNullOrdering: NullOrdering = NullsFirst
51-
}
52-
5348
case object NullsFirst extends NullOrdering {
5449
override def sql: String = "NULLS FIRST"
5550
}
@@ -74,13 +69,8 @@ case class SortOrder(
7469

7570
override def children: Seq[Expression] = child +: sameOrderExpressions
7671

77-
override def checkInputDataTypes(): TypeCheckResult = {
78-
if (direction == Constant) {
79-
TypeCheckResult.TypeCheckSuccess
80-
} else {
81-
TypeUtils.checkForOrderingExpr(dataType, prettyName)
82-
}
83-
}
72+
override def checkInputDataTypes(): TypeCheckResult =
73+
TypeUtils.checkForOrderingExpr(dataType, prettyName)
8474

8575
override def dataType: DataType = child.dataType
8676
override def nullable: Boolean = child.nullable
@@ -91,8 +81,8 @@ case class SortOrder(
9181
def isAscending: Boolean = direction == Ascending
9282

9383
def satisfies(required: SortOrder): Boolean = {
94-
children.exists(required.child.semanticEquals) && (direction == Constant ||
95-
direction == required.direction && nullOrdering == required.nullOrdering)
84+
children.exists(required.child.semanticEquals) &&
85+
direction == required.direction && nullOrdering == required.nullOrdering
9686
}
9787

9888
override protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]): SortOrder =
@@ -111,38 +101,21 @@ object SortOrder {
111101
* Returns if a sequence of SortOrder satisfies another sequence of SortOrder.
112102
*
113103
* SortOrder sequence A satisfies SortOrder sequence B if and only if B is an equivalent of A
114-
* or of A's prefix, except for SortOrder in B that satisfies any constant SortOrder in A.
115-
*
116-
* Here are examples of ordering A satisfying ordering B:
104+
* or of A's prefix. Here are examples of ordering A satisfying ordering B:
117105
* <ul>
118106
* <li>ordering A is [x, y] and ordering B is [x]</li>
119-
* <li>ordering A is [z(const), x, y] and ordering B is [x, z]</li>
120107
* <li>ordering A is [x(sameOrderExpressions=x1)] and ordering B is [x1]</li>
121108
* <li>ordering A is [x(sameOrderExpressions=x1), y] and ordering B is [x1]</li>
122109
* </ul>
123110
*/
124-
def orderingSatisfies(
125-
providedOrdering: Seq[SortOrder], requiredOrdering: Seq[SortOrder]): Boolean = {
126-
if (requiredOrdering.isEmpty) {
127-
return true
128-
}
129-
130-
val (constantProvidedOrdering, nonConstantProvidedOrdering) = providedOrdering.partition {
131-
case SortOrder(_, Constant, _, _) => true
132-
case SortOrder(child, _, _, _) => child.foldable
133-
}
134-
135-
val effectiveRequiredOrdering = requiredOrdering.filterNot { requiredOrder =>
136-
constantProvidedOrdering.exists { providedOrder =>
137-
providedOrder.satisfies(requiredOrder)
138-
}
139-
}
140-
141-
if (effectiveRequiredOrdering.length > nonConstantProvidedOrdering.length) {
111+
def orderingSatisfies(ordering1: Seq[SortOrder], ordering2: Seq[SortOrder]): Boolean = {
112+
if (ordering2.isEmpty) {
113+
true
114+
} else if (ordering2.length > ordering1.length) {
142115
false
143116
} else {
144-
effectiveRequiredOrdering.zip(nonConstantProvidedOrdering).forall {
145-
case (required, provided) => provided.satisfies(required)
117+
ordering2.zip(ordering1).forall {
118+
case (o2, o1) => o1.satisfies(o2)
146119
}
147120
}
148121
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.expressions.aggregate
2020
import org.apache.spark.SparkIllegalArgumentException
2121
import org.apache.spark.sql.catalyst.InternalRow
2222
import org.apache.spark.sql.catalyst.analysis.{ExpressionBuilder, UnresolvedWithinGroup}
23-
import org.apache.spark.sql.catalyst.expressions.{Ascending, Constant, Descending, Expression, ExpressionDescription, ImplicitCastInputTypes, SortOrder}
23+
import org.apache.spark.sql.catalyst.expressions.{Ascending, Descending, Expression, ExpressionDescription, ImplicitCastInputTypes, SortOrder}
2424
import org.apache.spark.sql.catalyst.expressions.Cast.toSQLExpr
2525
import org.apache.spark.sql.catalyst.trees.UnaryLike
2626
import org.apache.spark.sql.catalyst.types.PhysicalDataType
@@ -199,8 +199,6 @@ case class Mode(
199199
this.copy(child = child, reverseOpt = Some(true))
200200
case SortOrder(child, Descending, _, _) =>
201201
this.copy(child = child, reverseOpt = Some(false))
202-
case SortOrder(child, Constant, _, _) =>
203-
this.copy(child = child)
204202
}
205203
case _ => this
206204
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/percentiles.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -382,7 +382,7 @@ case class PercentileCont(left: Expression, right: Expression, reverse: Boolean
382382
nodeName, 1, orderingWithinGroup.length)
383383
}
384384
orderingWithinGroup.head match {
385-
case SortOrder(child, Ascending | Constant, _, _) => this.copy(left = child)
385+
case SortOrder(child, Ascending, _, _) => this.copy(left = child)
386386
case SortOrder(child, Descending, _, _) => this.copy(left = child, reverse = true)
387387
}
388388
}
@@ -440,7 +440,7 @@ case class PercentileDisc(
440440
nodeName, 1, orderingWithinGroup.length)
441441
}
442442
orderingWithinGroup.head match {
443-
case SortOrder(expr, Ascending | Constant, _, _) => this.copy(child = expr)
443+
case SortOrder(expr, Ascending, _, _) => this.copy(child = expr)
444444
case SortOrder(expr, Descending, _, _) => this.copy(child = expr, reverse = true)
445445
}
446446
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1919,7 +1919,7 @@ object CombineFilters extends Rule[LogicalPlan] with PredicateHelper {
19191919
object EliminateSorts extends Rule[LogicalPlan] {
19201920
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUpWithPruning(_.containsPattern(SORT)) {
19211921
case s @ Sort(orders, _, child, _) if orders.isEmpty || orders.exists(_.child.foldable) =>
1922-
val newOrders = orders.filterNot(o => o.direction != Constant && o.child.foldable)
1922+
val newOrders = orders.filterNot(_.child.foldable)
19231923
if (newOrders.isEmpty) {
19241924
child
19251925
} else {

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.plans
2020
import scala.collection.mutable
2121

2222
import org.apache.spark.sql.catalyst.SQLConfHelper
23-
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeSet, Constant, Empty2Null, Expression, NamedExpression, SortOrder}
23+
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeSet, Empty2Null, Expression, NamedExpression, SortOrder}
2424
import org.apache.spark.sql.internal.SQLConf
2525

2626
/**
@@ -128,8 +128,6 @@ trait AliasAwareQueryOutputOrdering[T <: QueryPlan[T]]
128128
}
129129
}
130130
}
131-
newOrdering.takeWhile(_.isDefined).flatten.toSeq ++ outputExpressions.collect {
132-
case a @ Alias(child, _) if child.foldable => SortOrder(a.toAttribute, Constant)
133-
}
131+
newOrdering.takeWhile(_.isDefined).flatten.toSeq
134132
}
135133
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -912,8 +912,7 @@ case class Sort(
912912
override def maxRowsPerPartition: Option[Long] = {
913913
if (global) maxRows else child.maxRowsPerPartition
914914
}
915-
override def outputOrdering: Seq[SortOrder] =
916-
order ++ child.outputOrdering.filter(_.direction == Constant)
915+
override def outputOrdering: Seq[SortOrder] = order
917916
final override val nodePatterns: Seq[TreePattern] = Seq(SORT)
918917
override protected def withNewChildInternal(newChild: LogicalPlan): Sort = copy(child = newChild)
919918
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/OrderingSuite.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,9 @@ class OrderingSuite extends SparkFunSuite with ExpressionEvalHelper {
4343
val sortOrder = direction match {
4444
case Ascending => BoundReference(0, dataType, nullable = true).asc
4545
case Descending => BoundReference(0, dataType, nullable = true).desc
46-
case Constant => BoundReference(0, dataType, nullable = true).const
4746
}
4847
val expectedCompareResult = direction match {
49-
case Ascending | Constant => signum(expected)
48+
case Ascending => signum(expected)
5049
case Descending => -1 * signum(expected)
5150
}
5251

sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,7 @@ case class SortExec(
4646

4747
override def output: Seq[Attribute] = child.output
4848

49-
override def outputOrdering: Seq[SortOrder] =
50-
sortOrder ++ child.outputOrdering.filter(_.direction == Constant)
49+
override def outputOrdering: Seq[SortOrder] = sortOrder
5150

5251
// sort performed is local within a given partition so will retain
5352
// child operator's partitioning
@@ -74,17 +73,15 @@ case class SortExec(
7473
* should make it public.
7574
*/
7675
def createSorter(): UnsafeExternalRowSorter = {
77-
val effectiveSortOrder = sortOrder.filterNot(_.direction == Constant)
78-
7976
rowSorter = new ThreadLocal[UnsafeExternalRowSorter]()
8077

8178
val ordering = RowOrdering.create(sortOrder, output)
8279

8380
// The comparator for comparing prefix
84-
val boundSortExpression = BindReferences.bindReference(effectiveSortOrder.head, output)
81+
val boundSortExpression = BindReferences.bindReference(sortOrder.head, output)
8582
val prefixComparator = SortPrefixUtils.getPrefixComparator(boundSortExpression)
8683

87-
val canUseRadixSort = enableRadixSort && effectiveSortOrder.length == 1 &&
84+
val canUseRadixSort = enableRadixSort && sortOrder.length == 1 &&
8885
SortPrefixUtils.canSortFullyWithPrefix(boundSortExpression)
8986

9087
// The generator for prefix

sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,6 @@ object SortPrefixUtils {
6363
PrefixComparators.STRING_DESC_NULLS_FIRST
6464
case Descending =>
6565
PrefixComparators.STRING_DESC
66-
case Constant =>
67-
NoOpPrefixComparator
6866
}
6967
}
7068

@@ -78,8 +76,6 @@ object SortPrefixUtils {
7876
PrefixComparators.BINARY_DESC_NULLS_FIRST
7977
case Descending =>
8078
PrefixComparators.BINARY_DESC
81-
case Constant =>
82-
NoOpPrefixComparator
8379
}
8480
}
8581

@@ -93,8 +89,6 @@ object SortPrefixUtils {
9389
PrefixComparators.LONG_DESC_NULLS_FIRST
9490
case Descending =>
9591
PrefixComparators.LONG_DESC
96-
case Constant =>
97-
NoOpPrefixComparator
9892
}
9993
}
10094

@@ -108,8 +102,6 @@ object SortPrefixUtils {
108102
PrefixComparators.DOUBLE_DESC_NULLS_FIRST
109103
case Descending =>
110104
PrefixComparators.DOUBLE_DESC
111-
case Constant =>
112-
NoOpPrefixComparator
113105
}
114106
}
115107

0 commit comments

Comments
 (0)