diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala index 3cbe1654ea2c..3ba8745be995 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala @@ -65,7 +65,11 @@ trait AliasAwareOutputOrdering extends AliasAwareOutputExpression { final override def outputOrdering: Seq[SortOrder] = { if (hasAlias) { - orderingExpressions.map(normalizeExpression(_).asInstanceOf[SortOrder]) + orderingExpressions.map { sortOrder => + val newSortOrder = normalizeExpression(sortOrder).asInstanceOf[SortOrder] + val newSameOrderExpressions = newSortOrder.sameOrderExpressions.map(normalizeExpression) + newSortOrder.copy(sameOrderExpressions = newSameOrderExpressions) + } } else { orderingExpressions } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 7db94a702488..d147876c0f17 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -1059,6 +1059,37 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper { } } + test("SPARK-33400: Normalization of sortOrder should take care of sameOrderExprs") { + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + withTempView("t1", "t2", "t3") { + spark.range(10).repartition($"id").createTempView("t1") + spark.range(20).repartition($"id").createTempView("t2") + spark.range(30).repartition($"id").createTempView("t3") + val planned = sql( + """ + |SELECT t2id, t3.id as t3id + |FROM ( + | SELECT t1.id as t1id, t2.id as t2id + | FROM t1, t2 + | WHERE t1.id = t2.id + |) t12, t3 + |WHERE t2id = t3.id + """.stripMargin).queryExecution.executedPlan + + val sortNodes = planned.collect { case s: SortExec => s } + assert(sortNodes.size == 3) + + val projects = planned.collect { case p: ProjectExec => p } + assert(projects.exists(_.outputOrdering match { + case Seq(SortOrder(_, Ascending, NullsFirst, sameOrderExprs)) => + sameOrderExprs.size == 1 && sameOrderExprs.head.isInstanceOf[AttributeReference] && + sameOrderExprs.head.asInstanceOf[AttributeReference].name == "t2id" + case _ => false + })) + } + } + } + test("aliases to expressions should not be replaced") { withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { withTempView("df1", "df2") {