Skip to content

Commit f33d8aa

Browse files
pan3793peter-toth
authored andcommitted
[SPARK-53738][SQL] Fix planned write when query output contains foldable orderings
### What changes were proposed in this pull request? This is the second try of #52474, following [the suggestion from cloud-fan](#52474 (comment)) This PR fixes a bug in `plannedWrite`, where the `query` has foldable orderings in the partition columns. ``` CREATE TABLE t (i INT, j INT, k STRING) USING PARQUET PARTITIONED BY (k); INSERT OVERWRITE t SELECT j AS i, i AS j, '0' as k FROM t0 SORT BY k, i; ``` The evaluation of `FileFormatWriter.orderingMatched` fails because `SortOrder(Literal)` is eliminated by `EliminateSorts`. ### Why are the changes needed? `V1Writes` will override the custom sort order when the query output ordering does not satisfy the required ordering. Before SPARK-53707, when the query's output contains literals in partition columns, the judgment produces a false-negative result, thus causing the sort order not to take effect. SPARK-53707 partially fixes the issue on the logical plan by adding a `Project` of query in `V1Writes`. Before SPARK-53707 ``` Sort [0 ASC NULLS FIRST, i#280 ASC NULLS FIRST], false +- Project [j#287 AS i#280, i#286 AS j#281, 0 AS k#282] +- Relation spark_catalog.default.t0[i#286,j#287,k#288] parquet ``` After SPARK-53707 ``` Project [i#284, j#285, 0 AS k#290] +- Sort [0 ASC NULLS FIRST, i#284 ASC NULLS FIRST], false +- Project [i#284, j#285] +- Relation spark_catalog.default.t0[i#284,j#285,k#286] parquet ``` Note, note the issue still exists because there is another place to check the ordering match again in `FileFormatWriter`. This PR fixes the issue thoroughly, with new UTs added. ### Does this PR introduce _any_ user-facing change? Yes, it's a bug fix. ### How was this patch tested? New UTs are added. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #52584 from pan3793/SPARK-53738-rework. Authored-by: Cheng Pan <[email protected]> Signed-off-by: Peter Toth <[email protected]>
1 parent 9f32542 commit f33d8aa

File tree

4 files changed

+118
-16
lines changed

4 files changed

+118
-16
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -439,9 +439,7 @@ case class InMemoryRelation(
439439
override def innerChildren: Seq[SparkPlan] = Seq(cachedPlan)
440440

441441
override def doCanonicalize(): logical.LogicalPlan =
442-
copy(output = output.map(QueryPlan.normalizeExpressions(_, output)),
443-
cacheBuilder,
444-
outputOrdering)
442+
withOutput(output.map(QueryPlan.normalizeExpressions(_, output)))
445443

446444
@transient val partitionStatistics = new PartitionStatistics(output)
447445

@@ -469,8 +467,13 @@ case class InMemoryRelation(
469467
}
470468
}
471469

472-
def withOutput(newOutput: Seq[Attribute]): InMemoryRelation =
473-
InMemoryRelation(newOutput, cacheBuilder, outputOrdering, statsOfPlanToCache)
470+
def withOutput(newOutput: Seq[Attribute]): InMemoryRelation = {
471+
val map = AttributeMap(output.zip(newOutput))
472+
val newOutputOrdering = outputOrdering
473+
.map(_.transform { case a: Attribute => map(a) })
474+
.asInstanceOf[Seq[SortOrder]]
475+
InMemoryRelation(newOutput, cacheBuilder, newOutputOrdering, statsOfPlanToCache)
476+
}
474477

475478
override def newInstance(): this.type = {
476479
InMemoryRelation(
@@ -487,6 +490,12 @@ case class InMemoryRelation(
487490
cloned
488491
}
489492

493+
override def makeCopy(newArgs: Array[AnyRef]): LogicalPlan = {
494+
val copied = super.makeCopy(newArgs).asInstanceOf[InMemoryRelation]
495+
copied.statsOfPlanToCache = this.statsOfPlanToCache
496+
copied
497+
}
498+
490499
override def simpleString(maxFields: Int): String =
491500
s"InMemoryRelation [${truncatedString(output, ", ", maxFields)}], ${cacheBuilder.storageLevel}"
492501

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources
2020
import org.apache.spark.sql.catalyst.catalog.BucketSpec
2121
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
2222
import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Attribute, AttributeMap, AttributeSet, BitwiseAnd, Empty2Null, Expression, HiveHash, Literal, NamedExpression, Pmod, SortOrder}
23+
import org.apache.spark.sql.catalyst.optimizer.{EliminateSorts, FoldablePropagation}
2324
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Sort}
2425
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
2526
import org.apache.spark.sql.catalyst.rules.Rule
@@ -97,13 +98,15 @@ object V1Writes extends Rule[LogicalPlan] {
9798
assert(empty2NullPlan.output.length == query.output.length)
9899
val attrMap = AttributeMap(query.output.zip(empty2NullPlan.output))
99100

100-
// Rewrite the attribute references in the required ordering to use the new output.
101-
val requiredOrdering = write.requiredOrdering.map(_.transform {
102-
case a: Attribute => attrMap.getOrElse(a, a)
103-
}.asInstanceOf[SortOrder])
104-
val outputOrdering = empty2NullPlan.outputOrdering
105-
val orderingMatched = isOrderingMatched(requiredOrdering.map(_.child), outputOrdering)
106-
if (orderingMatched) {
101+
// Rewrite the attribute references in the required ordering to use the new output,
102+
// then eliminate foldable ordering.
103+
val requiredOrdering = {
104+
val ordering = write.requiredOrdering.map(_.transform {
105+
case a: Attribute => attrMap.getOrElse(a, a)
106+
}.asInstanceOf[SortOrder])
107+
eliminateFoldableOrdering(ordering, empty2NullPlan).outputOrdering
108+
}
109+
if (isOrderingMatched(requiredOrdering.map(_.child), empty2NullPlan.outputOrdering)) {
107110
empty2NullPlan
108111
} else {
109112
Sort(requiredOrdering, global = false, empty2NullPlan)
@@ -199,6 +202,15 @@ object V1WritesUtils {
199202
expressions.exists(_.exists(_.isInstanceOf[Empty2Null]))
200203
}
201204

205+
// SPARK-53738: the required ordering inferred from table spec (partition, bucketing, etc.)
206+
// may contain foldable sort ordering expressions, which causes the optimized query's output
207+
// ordering mismatch, here we calculate the required ordering more accurately, by creating a
208+
// fake Sort node with the input query, then remove the foldable sort ordering expressions.
209+
def eliminateFoldableOrdering(ordering: Seq[SortOrder], query: LogicalPlan): LogicalPlan =
210+
EliminateSorts(FoldablePropagation(Sort(ordering, global = false, query)))
211+
212+
// The comparison ignores SortDirection and NullOrdering since it doesn't matter
213+
// for writing cases.
202214
def isOrderingMatched(
203215
requiredOrdering: Seq[Expression],
204216
outputOrdering: Seq[SortOrder]): Boolean = {

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala

Lines changed: 51 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,10 +63,23 @@ trait V1WriteCommandSuiteBase extends SQLTestUtils with AdaptiveSparkPlanHelper
6363
hasLogicalSort: Boolean,
6464
orderingMatched: Boolean,
6565
hasEmpty2Null: Boolean = false)(query: => Unit): Unit = {
66-
var optimizedPlan: LogicalPlan = null
66+
executeAndCheckOrderingAndCustomValidate(
67+
hasLogicalSort, Some(orderingMatched), hasEmpty2Null)(query)(_ => ())
68+
}
69+
70+
/**
71+
* Execute a write query and check ordering of the plan, then do custom validation
72+
*/
73+
protected def executeAndCheckOrderingAndCustomValidate(
74+
hasLogicalSort: Boolean,
75+
orderingMatched: Option[Boolean],
76+
hasEmpty2Null: Boolean = false)(query: => Unit)(
77+
customValidate: LogicalPlan => Unit): Unit = {
78+
@volatile var optimizedPlan: LogicalPlan = null
6779

6880
val listener = new QueryExecutionListener {
6981
override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = {
82+
val conf = qe.sparkSession.sessionState.conf
7083
qe.optimizedPlan match {
7184
case w: V1WriteCommand =>
7285
if (hasLogicalSort && conf.getConf(SQLConf.PLANNED_WRITE_ENABLED)) {
@@ -85,9 +98,12 @@ trait V1WriteCommandSuiteBase extends SQLTestUtils with AdaptiveSparkPlanHelper
8598

8699
query
87100

88-
// Check whether the output ordering is matched before FileFormatWriter executes rdd.
89-
assert(FileFormatWriter.outputOrderingMatched == orderingMatched,
90-
s"Expect: $orderingMatched, Actual: ${FileFormatWriter.outputOrderingMatched}")
101+
orderingMatched.foreach { matched =>
102+
// Check whether the output ordering is matched before FileFormatWriter executes rdd.
103+
assert(FileFormatWriter.outputOrderingMatched == matched,
104+
s"Expect orderingMatched: $matched, " +
105+
s"Actual: ${FileFormatWriter.outputOrderingMatched}")
106+
}
91107

92108
sparkContext.listenerBus.waitUntilEmpty()
93109

@@ -103,6 +119,8 @@ trait V1WriteCommandSuiteBase extends SQLTestUtils with AdaptiveSparkPlanHelper
103119
assert(empty2nullExpr == hasEmpty2Null,
104120
s"Expect hasEmpty2Null: $hasEmpty2Null, Actual: $empty2nullExpr. Plan:\n$optimizedPlan")
105121

122+
customValidate(optimizedPlan)
123+
106124
spark.listenerManager.unregister(listener)
107125
}
108126
}
@@ -391,4 +409,33 @@ class V1WriteCommandSuite extends QueryTest with SharedSparkSession with V1Write
391409
}
392410
}
393411
}
412+
413+
test("v1 write with sort by literal column preserve custom order") {
414+
withPlannedWrite { enabled =>
415+
withTable("t") {
416+
sql(
417+
"""
418+
|CREATE TABLE t(i INT, j INT, k STRING) USING PARQUET
419+
|PARTITIONED BY (k)
420+
|""".stripMargin)
421+
// Skip checking orderingMatched temporarily to avoid touching `FileFormatWriter`,
422+
// see details at https://github.com/apache/spark/pull/52584#issuecomment-3407716019
423+
executeAndCheckOrderingAndCustomValidate(
424+
hasLogicalSort = true, orderingMatched = None) {
425+
sql(
426+
"""
427+
|INSERT OVERWRITE t
428+
|SELECT i, j, '0' as k FROM t0 SORT BY k, i
429+
|""".stripMargin)
430+
} { optimizedPlan =>
431+
assert {
432+
optimizedPlan.outputOrdering.exists {
433+
case SortOrder(attr: AttributeReference, _, _, _) => attr.name == "i"
434+
case _ => false
435+
}
436+
}
437+
}
438+
}
439+
}
440+
}
394441
}

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/V1WriteHiveCommandSuite.scala

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.sql.hive.execution.command
1919

2020
import org.apache.spark.sql.QueryTest
21+
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, SortOrder}
2122
import org.apache.spark.sql.execution.datasources.V1WriteCommandSuiteBase
2223
import org.apache.spark.sql.hive.HiveUtils._
2324
import org.apache.spark.sql.hive.test.TestHiveSingleton
@@ -126,4 +127,37 @@ class V1WriteHiveCommandSuite
126127
}
127128
}
128129
}
130+
131+
test("v1 write to hive table with sort by literal column preserve custom order") {
132+
withCovnertMetastore { _ =>
133+
withPlannedWrite { enabled =>
134+
withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
135+
withTable("t") {
136+
sql(
137+
"""
138+
|CREATE TABLE t(i INT, j INT, k STRING) STORED AS PARQUET
139+
|PARTITIONED BY (k)
140+
|""".stripMargin)
141+
// Skip checking orderingMatched temporarily to avoid touching `FileFormatWriter`,
142+
// see details at https://github.com/apache/spark/pull/52584#issuecomment-3407716019
143+
executeAndCheckOrderingAndCustomValidate(
144+
hasLogicalSort = true, orderingMatched = None) {
145+
sql(
146+
"""
147+
|INSERT OVERWRITE t
148+
|SELECT i, j, '0' as k FROM t0 SORT BY k, i
149+
|""".stripMargin)
150+
} { optimizedPlan =>
151+
assert {
152+
optimizedPlan.outputOrdering.exists {
153+
case SortOrder(attr: AttributeReference, _, _, _) => attr.name == "i"
154+
case _ => false
155+
}
156+
}
157+
}
158+
}
159+
}
160+
}
161+
}
162+
}
129163
}

0 commit comments

Comments
 (0)