diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 794d90b242c1..ae5c63ca39f5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -55,12 +55,6 @@ object FileFormatWriter extends Logging { maxWriters: Int, createSorter: () => UnsafeExternalRowSorter) - /** - * A variable used in tests to check whether the output ordering of the query matches the - * required ordering of the write command. - */ - private[sql] var outputOrderingMatched: Boolean = false - // scalastyle:off argcount /** * Basic work flow of this command is: @@ -177,7 +171,15 @@ object FileFormatWriter extends Logging { // 1) When the planned write config is disabled. // 2) When the concurrent writers are enabled (in this case the required ordering of a // V1 write command will be empty). - if (Utils.isTesting) outputOrderingMatched = orderingMatched + if (Utils.isTesting) { + if (SQLConf.get.plannedWriteEnabled && !orderingMatched) { + // When testing, throw an exception if the ordering does not match to improve coverage. + // In production, we will inject an extra sort if the ordering does not match so it + // should be only a performance issue rather than a bug. + throw new IllegalStateException( + s"BUG: ordering should match when ${SQLConf.PLANNED_WRITE_ENABLED.key} is enabled.") + } + } try { val (rdd, concurrentOutputWriterSpec) = if (orderingMatched) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala index c18396b554d7..76a7808f30fa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala @@ -57,7 +57,6 @@ abstract class V1WriteCommandSuiteBase extends QueryTest with SQLTestUtils { */ protected def executeAndCheckOrdering( hasLogicalSort: Boolean, - orderingMatched: Boolean, hasEmpty2Null: Boolean = false)(query: => Unit): Unit = { var optimizedPlan: LogicalPlan = null @@ -75,10 +74,6 @@ abstract class V1WriteCommandSuiteBase extends QueryTest with SQLTestUtils { query - // Check whether the output ordering is matched before FileFormatWriter executes rdd. - assert(FileFormatWriter.outputOrderingMatched == orderingMatched, - s"Expect: $orderingMatched, Actual: ${FileFormatWriter.outputOrderingMatched}") - sparkContext.listenerBus.waitUntilEmpty() // Check whether a logical sort node is at the top of the logical plan of the write query. @@ -104,9 +99,9 @@ class V1WriteCommandSuite extends V1WriteCommandSuiteBase with SharedSparkSessio import testImplicits._ test("v1 write without partition columns") { - withPlannedWrite { enabled => + withPlannedWrite { _ => withTable("t") { - executeAndCheckOrdering(hasLogicalSort = false, orderingMatched = true) { + executeAndCheckOrdering(hasLogicalSort = false) { sql("CREATE TABLE t USING PARQUET AS SELECT * FROM t0") } } @@ -116,7 +111,7 @@ class V1WriteCommandSuite extends V1WriteCommandSuiteBase with SharedSparkSessio test("v1 write with non-string partition columns") { withPlannedWrite { enabled => withTable("t") { - executeAndCheckOrdering(hasLogicalSort = enabled, orderingMatched = enabled) { + executeAndCheckOrdering(hasLogicalSort = enabled) { sql("CREATE TABLE t USING PARQUET PARTITIONED BY (j) AS SELECT i, k, j FROM t0") } } @@ -127,7 +122,7 @@ class V1WriteCommandSuite extends V1WriteCommandSuiteBase with SharedSparkSessio withPlannedWrite { enabled => withTable("t") { executeAndCheckOrdering( - hasLogicalSort = enabled, orderingMatched = enabled, hasEmpty2Null = enabled) { + hasLogicalSort = enabled, hasEmpty2Null = enabled) { sql("CREATE TABLE t USING PARQUET PARTITIONED BY (k) AS SELECT * FROM t0") } } @@ -144,7 +139,7 @@ class V1WriteCommandSuite extends V1WriteCommandSuiteBase with SharedSparkSessio |CLUSTERED BY (i, j) SORTED BY (j) INTO 2 BUCKETS |""".stripMargin) executeAndCheckOrdering( - hasLogicalSort = enabled, orderingMatched = enabled, hasEmpty2Null = enabled) { + hasLogicalSort = enabled, hasEmpty2Null = enabled) { sql("INSERT INTO t SELECT * FROM t0") } } @@ -164,7 +159,7 @@ class V1WriteCommandSuite extends V1WriteCommandSuiteBase with SharedSparkSessio // sort orders. So the ordering will not match. This issue does not exist when // planned write is enabled, because AQE will be applied on top of the write // command instead of on top of the child query plan. - executeAndCheckOrdering(hasLogicalSort = true, orderingMatched = enabled) { + executeAndCheckOrdering(hasLogicalSort = true) { sql("INSERT INTO t SELECT i, k, j FROM t0 ORDER BY j") } } @@ -180,7 +175,7 @@ class V1WriteCommandSuite extends V1WriteCommandSuiteBase with SharedSparkSessio |PARTITIONED BY (k STRING) |""".stripMargin) executeAndCheckOrdering( - hasLogicalSort = true, orderingMatched = enabled, hasEmpty2Null = enabled) { + hasLogicalSort = true, hasEmpty2Null = enabled) { sql("INSERT INTO t SELECT * FROM t0 ORDER BY k") } } @@ -191,7 +186,7 @@ class V1WriteCommandSuite extends V1WriteCommandSuiteBase with SharedSparkSessio withPlannedWrite { enabled => withTempPath { path => executeAndCheckOrdering( - hasLogicalSort = enabled, orderingMatched = enabled, hasEmpty2Null = enabled) { + hasLogicalSort = enabled, hasEmpty2Null = enabled) { Seq((0, None), (1, Some("")), (2, None), (3, Some("x"))) .toDF("id", "p") .write @@ -223,7 +218,7 @@ class V1WriteCommandSuite extends V1WriteCommandSuiteBase with SharedSparkSessio // SMJ to BHJ which will remove the original output ordering from the SMJ. // In this case AQE should still add back the sort node from the logical plan // during re-planning, and ordering should be matched in FileFormatWriter. - executeAndCheckOrdering(hasLogicalSort = enabled, orderingMatched = enabled) { + executeAndCheckOrdering(hasLogicalSort = enabled) { sql( """ |INSERT INTO t @@ -247,7 +242,7 @@ class V1WriteCommandSuite extends V1WriteCommandSuiteBase with SharedSparkSessio |""".stripMargin) // partition columns are static - executeAndCheckOrdering(hasLogicalSort = false, orderingMatched = true) { + executeAndCheckOrdering(hasLogicalSort = false) { sql( """ |INSERT INTO t PARTITION(p1=1, p2='a') @@ -257,7 +252,7 @@ class V1WriteCommandSuite extends V1WriteCommandSuiteBase with SharedSparkSessio // one static partition column and one dynamic partition column executeAndCheckOrdering( - hasLogicalSort = enabled, orderingMatched = enabled, hasEmpty2Null = enabled) { + hasLogicalSort = enabled, hasEmpty2Null = enabled) { sql( """ |INSERT INTO t PARTITION(p1=1, p2) @@ -267,7 +262,7 @@ class V1WriteCommandSuite extends V1WriteCommandSuiteBase with SharedSparkSessio // partition columns are dynamic executeAndCheckOrdering( - hasLogicalSort = enabled, orderingMatched = enabled, hasEmpty2Null = enabled) { + hasLogicalSort = enabled, hasEmpty2Null = enabled) { sql( """ |INSERT INTO t PARTITION(p1, p2) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/V1WriteHiveCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/V1WriteHiveCommandSuite.scala index 364b79717300..5bf65f2e881c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/V1WriteHiveCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/V1WriteHiveCommandSuite.scala @@ -23,9 +23,9 @@ import org.apache.spark.sql.hive.test.TestHiveSingleton class V1WriteHiveCommandSuite extends V1WriteCommandSuiteBase with TestHiveSingleton { test("create hive table as select - no partition column") { - withPlannedWrite { enabled => + withPlannedWrite { _ => withTable("t") { - executeAndCheckOrdering(hasLogicalSort = false, orderingMatched = true) { + executeAndCheckOrdering(hasLogicalSort = false) { sql("CREATE TABLE t AS SELECT * FROM t0") } } @@ -37,7 +37,7 @@ class V1WriteHiveCommandSuite extends V1WriteCommandSuiteBase with TestHiveSingl withTable("t") { withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") { executeAndCheckOrdering( - hasLogicalSort = enabled, orderingMatched = enabled, hasEmpty2Null = enabled) { + hasLogicalSort = enabled, hasEmpty2Null = enabled) { sql( """ |CREATE TABLE t @@ -61,7 +61,7 @@ class V1WriteHiveCommandSuite extends V1WriteCommandSuiteBase with TestHiveSingl |""".stripMargin) withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") { executeAndCheckOrdering( - hasLogicalSort = enabled, orderingMatched = enabled, hasEmpty2Null = enabled) { + hasLogicalSort = enabled, hasEmpty2Null = enabled) { sql("INSERT INTO t SELECT * FROM t0") } } @@ -80,7 +80,7 @@ class V1WriteHiveCommandSuite extends V1WriteCommandSuiteBase with TestHiveSingl |AS SELECT * FROM t0 |""".stripMargin) executeAndCheckOrdering( - hasLogicalSort = enabled, orderingMatched = enabled, hasEmpty2Null = enabled) { + hasLogicalSort = enabled, hasEmpty2Null = enabled) { sql("INSERT OVERWRITE t SELECT j AS i, i AS j, k FROM t0") } } @@ -89,7 +89,7 @@ class V1WriteHiveCommandSuite extends V1WriteCommandSuiteBase with TestHiveSingl } test("insert into hive table with static partitions only") { - withPlannedWrite { enabled => + withPlannedWrite { _ => withTable("t") { sql( """ @@ -97,7 +97,7 @@ class V1WriteHiveCommandSuite extends V1WriteCommandSuiteBase with TestHiveSingl |PARTITIONED BY (k STRING) |""".stripMargin) // No dynamic partition so no sort is needed. - executeAndCheckOrdering(hasLogicalSort = false, orderingMatched = true) { + executeAndCheckOrdering(hasLogicalSort = false) { sql("INSERT INTO t PARTITION (k='0') SELECT i, j FROM t0 WHERE k = '0'") } }