Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,6 @@ object FileFormatWriter extends Logging {
maxWriters: Int,
createSorter: () => UnsafeExternalRowSorter)

/**
* A variable used in tests to check whether the output ordering of the query matches the
* required ordering of the write command.
*/
private[sql] var outputOrderingMatched: Boolean = false

// scalastyle:off argcount
/**
* Basic work flow of this command is:
Expand Down Expand Up @@ -177,7 +171,15 @@ object FileFormatWriter extends Logging {
// 1) When the planned write config is disabled.
// 2) When the concurrent writers are enabled (in this case the required ordering of a
// V1 write command will be empty).
if (Utils.isTesting) outputOrderingMatched = orderingMatched
if (Utils.isTesting) {
if (SQLConf.get.plannedWriteEnabled && !orderingMatched) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we also need to check the concurrent writer is not enabled.

// When testing, throw an exception if the ordering does not match to improve coverage.
// In production, we will inject an extra sort if the ordering does not match so it
// should be only a performance issue rather than a bug.
throw new IllegalStateException(
s"BUG: ordering should match when ${SQLConf.PLANNED_WRITE_ENABLED.key} is enabled.")
}
}

try {
val (rdd, concurrentOutputWriterSpec) = if (orderingMatched) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ abstract class V1WriteCommandSuiteBase extends QueryTest with SQLTestUtils {
*/
protected def executeAndCheckOrdering(
hasLogicalSort: Boolean,
orderingMatched: Boolean,
hasEmpty2Null: Boolean = false)(query: => Unit): Unit = {
var optimizedPlan: LogicalPlan = null

Expand All @@ -75,10 +74,6 @@ abstract class V1WriteCommandSuiteBase extends QueryTest with SQLTestUtils {

query

// Check whether the output ordering is matched before FileFormatWriter executes rdd.
assert(FileFormatWriter.outputOrderingMatched == orderingMatched,
s"Expect: $orderingMatched, Actual: ${FileFormatWriter.outputOrderingMatched}")

sparkContext.listenerBus.waitUntilEmpty()

// Check whether a logical sort node is at the top of the logical plan of the write query.
Expand All @@ -104,9 +99,9 @@ class V1WriteCommandSuite extends V1WriteCommandSuiteBase with SharedSparkSessio
import testImplicits._

test("v1 write without partition columns") {
withPlannedWrite { enabled =>
withPlannedWrite { _ =>
withTable("t") {
executeAndCheckOrdering(hasLogicalSort = false, orderingMatched = true) {
executeAndCheckOrdering(hasLogicalSort = false) {
sql("CREATE TABLE t USING PARQUET AS SELECT * FROM t0")
}
}
Expand All @@ -116,7 +111,7 @@ class V1WriteCommandSuite extends V1WriteCommandSuiteBase with SharedSparkSessio
test("v1 write with non-string partition columns") {
withPlannedWrite { enabled =>
withTable("t") {
executeAndCheckOrdering(hasLogicalSort = enabled, orderingMatched = enabled) {
executeAndCheckOrdering(hasLogicalSort = enabled) {
sql("CREATE TABLE t USING PARQUET PARTITIONED BY (j) AS SELECT i, k, j FROM t0")
}
}
Expand All @@ -127,7 +122,7 @@ class V1WriteCommandSuite extends V1WriteCommandSuiteBase with SharedSparkSessio
withPlannedWrite { enabled =>
withTable("t") {
executeAndCheckOrdering(
hasLogicalSort = enabled, orderingMatched = enabled, hasEmpty2Null = enabled) {
hasLogicalSort = enabled, hasEmpty2Null = enabled) {
sql("CREATE TABLE t USING PARQUET PARTITIONED BY (k) AS SELECT * FROM t0")
}
}
Expand All @@ -144,7 +139,7 @@ class V1WriteCommandSuite extends V1WriteCommandSuiteBase with SharedSparkSessio
|CLUSTERED BY (i, j) SORTED BY (j) INTO 2 BUCKETS
|""".stripMargin)
executeAndCheckOrdering(
hasLogicalSort = enabled, orderingMatched = enabled, hasEmpty2Null = enabled) {
hasLogicalSort = enabled, hasEmpty2Null = enabled) {
sql("INSERT INTO t SELECT * FROM t0")
}
}
Expand All @@ -164,7 +159,7 @@ class V1WriteCommandSuite extends V1WriteCommandSuiteBase with SharedSparkSessio
// sort orders. So the ordering will not match. This issue does not exist when
// planned write is enabled, because AQE will be applied on top of the write
// command instead of on top of the child query plan.
executeAndCheckOrdering(hasLogicalSort = true, orderingMatched = enabled) {
executeAndCheckOrdering(hasLogicalSort = true) {
sql("INSERT INTO t SELECT i, k, j FROM t0 ORDER BY j")
}
}
Expand All @@ -180,7 +175,7 @@ class V1WriteCommandSuite extends V1WriteCommandSuiteBase with SharedSparkSessio
|PARTITIONED BY (k STRING)
|""".stripMargin)
executeAndCheckOrdering(
hasLogicalSort = true, orderingMatched = enabled, hasEmpty2Null = enabled) {
hasLogicalSort = true, hasEmpty2Null = enabled) {
sql("INSERT INTO t SELECT * FROM t0 ORDER BY k")
}
}
Expand All @@ -191,7 +186,7 @@ class V1WriteCommandSuite extends V1WriteCommandSuiteBase with SharedSparkSessio
withPlannedWrite { enabled =>
withTempPath { path =>
executeAndCheckOrdering(
hasLogicalSort = enabled, orderingMatched = enabled, hasEmpty2Null = enabled) {
hasLogicalSort = enabled, hasEmpty2Null = enabled) {
Seq((0, None), (1, Some("")), (2, None), (3, Some("x")))
.toDF("id", "p")
.write
Expand Down Expand Up @@ -223,7 +218,7 @@ class V1WriteCommandSuite extends V1WriteCommandSuiteBase with SharedSparkSessio
// SMJ to BHJ which will remove the original output ordering from the SMJ.
// In this case AQE should still add back the sort node from the logical plan
// during re-planning, and ordering should be matched in FileFormatWriter.
executeAndCheckOrdering(hasLogicalSort = enabled, orderingMatched = enabled) {
executeAndCheckOrdering(hasLogicalSort = enabled) {
sql(
"""
|INSERT INTO t
Expand All @@ -247,7 +242,7 @@ class V1WriteCommandSuite extends V1WriteCommandSuiteBase with SharedSparkSessio
|""".stripMargin)

// partition columns are static
executeAndCheckOrdering(hasLogicalSort = false, orderingMatched = true) {
executeAndCheckOrdering(hasLogicalSort = false) {
sql(
"""
|INSERT INTO t PARTITION(p1=1, p2='a')
Expand All @@ -257,7 +252,7 @@ class V1WriteCommandSuite extends V1WriteCommandSuiteBase with SharedSparkSessio

// one static partition column and one dynamic partition column
executeAndCheckOrdering(
hasLogicalSort = enabled, orderingMatched = enabled, hasEmpty2Null = enabled) {
hasLogicalSort = enabled, hasEmpty2Null = enabled) {
sql(
"""
|INSERT INTO t PARTITION(p1=1, p2)
Expand All @@ -267,7 +262,7 @@ class V1WriteCommandSuite extends V1WriteCommandSuiteBase with SharedSparkSessio

// partition columns are dynamic
executeAndCheckOrdering(
hasLogicalSort = enabled, orderingMatched = enabled, hasEmpty2Null = enabled) {
hasLogicalSort = enabled, hasEmpty2Null = enabled) {
sql(
"""
|INSERT INTO t PARTITION(p1, p2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import org.apache.spark.sql.hive.test.TestHiveSingleton
class V1WriteHiveCommandSuite extends V1WriteCommandSuiteBase with TestHiveSingleton {

test("create hive table as select - no partition column") {
withPlannedWrite { enabled =>
withPlannedWrite { _ =>
withTable("t") {
executeAndCheckOrdering(hasLogicalSort = false, orderingMatched = true) {
executeAndCheckOrdering(hasLogicalSort = false) {
sql("CREATE TABLE t AS SELECT * FROM t0")
}
}
Expand All @@ -37,7 +37,7 @@ class V1WriteHiveCommandSuite extends V1WriteCommandSuiteBase with TestHiveSingl
withTable("t") {
withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
executeAndCheckOrdering(
hasLogicalSort = enabled, orderingMatched = enabled, hasEmpty2Null = enabled) {
hasLogicalSort = enabled, hasEmpty2Null = enabled) {
sql(
"""
|CREATE TABLE t
Expand All @@ -61,7 +61,7 @@ class V1WriteHiveCommandSuite extends V1WriteCommandSuiteBase with TestHiveSingl
|""".stripMargin)
withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
executeAndCheckOrdering(
hasLogicalSort = enabled, orderingMatched = enabled, hasEmpty2Null = enabled) {
hasLogicalSort = enabled, hasEmpty2Null = enabled) {
sql("INSERT INTO t SELECT * FROM t0")
}
}
Expand All @@ -80,7 +80,7 @@ class V1WriteHiveCommandSuite extends V1WriteCommandSuiteBase with TestHiveSingl
|AS SELECT * FROM t0
|""".stripMargin)
executeAndCheckOrdering(
hasLogicalSort = enabled, orderingMatched = enabled, hasEmpty2Null = enabled) {
hasLogicalSort = enabled, hasEmpty2Null = enabled) {
sql("INSERT OVERWRITE t SELECT j AS i, i AS j, k FROM t0")
}
}
Expand All @@ -89,15 +89,15 @@ class V1WriteHiveCommandSuite extends V1WriteCommandSuiteBase with TestHiveSingl
}

test("insert into hive table with static partitions only") {
withPlannedWrite { enabled =>
withPlannedWrite { _ =>
withTable("t") {
sql(
"""
|CREATE TABLE t (i INT, j INT)
|PARTITIONED BY (k STRING)
|""".stripMargin)
// No dynamic partition so no sort is needed.
executeAndCheckOrdering(hasLogicalSort = false, orderingMatched = true) {
executeAndCheckOrdering(hasLogicalSort = false) {
sql("INSERT INTO t PARTITION (k='0') SELECT i, j FROM t0 WHERE k = '0'")
}
}
Expand Down