Skip to content

Commit b4b02bf

Browse files
committed
Add fix
1 parent 372466b commit b4b02bf

File tree

2 files changed

+10
-4
lines changed

2 files changed

+10
-4
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,9 +234,15 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
234234

235235
// Now that we've performed any necessary shuffles, add sorts to guarantee output orderings:
236236
children = children.zip(requiredChildOrderings).map { case (child, requiredOrdering) =>
237+
logDebug(s"Checking if sort of ${requiredOrdering} needed on ${child.simpleString}")
237238
if (requiredOrdering.nonEmpty) {
238239
// If child.outputOrdering is [a, b] and requiredOrdering is [a], we do not need to sort.
239-
if (requiredOrdering != child.outputOrdering.take(requiredOrdering.length)) {
240+
val orderings = requiredOrdering zip child.outputOrdering
241+
val needSort = orderings.length != requiredOrdering.length ||
242+
orderings.exists { case (requiredOrder, childOrder) =>
243+
!requiredOrder.semanticEquals(childOrder)
244+
}
245+
if (needSort) {
240246
SortExec(requiredOrdering, global = false, child = child)
241247
} else {
242248
child

sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -479,7 +479,7 @@ class PlannerSuite extends SharedSQLContext {
479479
}
480480
}
481481

482-
ignore("AIQ-2170 EnsureRequirements adds sort when ordering columns same but diff direction") {
482+
test("EnsureRequirements adds sort when ordering columns same but diff direction") {
483483
val orderingA = SortOrder(Literal(1), Ascending)
484484
val orderingB = SortOrder(Literal(1), Descending)
485485
assert(orderingA != orderingB)
@@ -495,7 +495,7 @@ class PlannerSuite extends SharedSQLContext {
495495
}
496496
}
497497

498-
ignore("AIQ-2170 EnsureRequirements doesn't add sort with cached sorted table") {
498+
test("EnsureRequirements doesn't add sort with cached sorted table") {
499499
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
500500
withTempTable("t1", "t2") {
501501
val df = Seq(
@@ -524,7 +524,7 @@ class PlannerSuite extends SharedSQLContext {
524524
}
525525
}
526526

527-
ignore("AIQ-2170 EnsureRequirements doesn't add sort with different column capitalization") {
527+
test("EnsureRequirements doesn't add sort with different column capitalization") {
528528
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
529529
withTempTable("t1", "t2") {
530530
val df = Seq(

0 commit comments

Comments
 (0)