From f486f184550bff9d187376f27cb12cbb21602c0a Mon Sep 17 00:00:00 2001 From: allisonwang-db Date: Tue, 5 Jul 2022 23:21:00 -0700 Subject: [PATCH 1/6] v1 writes --- .../apache/spark/sql/internal/SQLConf.scala | 10 ++ .../spark/sql/execution/SparkOptimizer.scala | 4 +- .../command/createDataSourceTables.scala | 16 ++- .../execution/datasources/DataSource.scala | 39 +++-- .../datasources/FileFormatWriter.scala | 36 +---- .../InsertIntoHadoopFsRelationCommand.scala | 7 +- .../sql/execution/datasources/V1Writes.scala | 134 ++++++++++++++++++ .../datasources/V1WriteCommandSuite.scala | 108 ++++++++++++++ .../CreateHiveTableAsSelectCommand.scala | 20 ++- .../hive/execution/InsertIntoHiveTable.scala | 69 ++------- .../sql/hive/execution/SaveAsHiveFile.scala | 8 +- .../hive/execution/V1WritesHiveUtils.scala | 108 ++++++++++++++ .../command/V1WriteHiveCommandSuite.scala | 71 ++++++++++ 13 files changed, 517 insertions(+), 113 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala create mode 100644 sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/V1WritesHiveUtils.scala create mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/V1WriteHiveCommandSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 1b7857ead59fa..72720f329a1a8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3781,6 +3781,14 @@ object SQLConf { .intConf .createWithDefault(0) + val PLANNED_WRITE_ENABLED = buildConf("spark.sql.plannedWrite.enabled") + .internal() + .doc("When set to true, Spark adds logical sorts to V1 write commands if needed so that " + + "`FileFormatWriter` does not need to insert physical sorts.") + .version("3.2.0") + .booleanConf + .createWithDefault(false) + val INFER_NESTED_DICT_AS_STRUCT = buildConf("spark.sql.pyspark.inferNestedDictAsStruct.enabled") .doc("PySpark's SparkSession.createDataFrame infers the nested dict as a map by default. " + "When it set to true, it infers the nested dict as a struct.") @@ -4617,6 +4625,8 @@ class SQLConf extends Serializable with Logging { def maxConcurrentOutputFileWriters: Int = getConf(SQLConf.MAX_CONCURRENT_OUTPUT_FILE_WRITERS) + def plannedWriteEnabled: Boolean = getConf(SQLConf.PLANNED_WRITE_ENABLED) + def inferDictAsStruct: Boolean = getConf(SQLConf.INFER_NESTED_DICT_AS_STRUCT) def legacyInferArrayTypeFromFirstElement: Boolean = getConf( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala index bffa1d1dae790..72bdab409a9e6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala @@ -23,8 +23,7 @@ import org.apache.spark.sql.catalyst.optimizer._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.connector.catalog.CatalogManager -import org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions -import org.apache.spark.sql.execution.datasources.SchemaPruning +import org.apache.spark.sql.execution.datasources.{PruneFileSourcePartitions, SchemaPruning, V1Writes} import org.apache.spark.sql.execution.datasources.v2.{GroupBasedRowLevelOperationScanPlanning, OptimizeMetadataOnlyDeleteFromTable, V2ScanPartitioningAndOrdering, V2ScanRelationPushDown, V2Writes} import org.apache.spark.sql.execution.dynamicpruning.{CleanupDynamicPruningFilters, PartitionPruning} import org.apache.spark.sql.execution.python.{ExtractGroupingPythonUDFFromAggregate, ExtractPythonUDFFromAggregate, ExtractPythonUDFs} @@ -39,6 +38,7 @@ class SparkOptimizer( // TODO: move SchemaPruning into catalyst Seq(SchemaPruning) :+ GroupBasedRowLevelOperationScanPlanning :+ + V1Writes :+ V2ScanRelationPushDown :+ V2ScanPartitioningAndOrdering :+ V2Writes :+ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index e64426f8de8f3..7847727ccf841 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -20,7 +20,9 @@ package org.apache.spark.sql.execution.command import java.net.URI import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.catalog._ +import org.apache.spark.sql.catalyst.expressions.SortOrder import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.errors.QueryCompilationErrors @@ -141,7 +143,19 @@ case class CreateDataSourceTableAsSelectCommand( mode: SaveMode, query: LogicalPlan, outputColumnNames: Seq[String]) - extends DataWritingCommand { + extends V1WriteCommand { + + override def requiredOrdering: Seq[SortOrder] = { + val unresolvedPartitionColumns = table.partitionColumnNames.map(UnresolvedAttribute.quoted) + val partitionColumns = DataSource.resolvePartitionColumns( + unresolvedPartitionColumns, + outputColumns, + query, + SparkSession.active.sessionState.conf.resolver) + // We do not need the path option from the table location to get writer bucket spec. + val options = table.storage.properties + V1WritesUtils.getSortOrder(outputColumns, partitionColumns, table.bucketSpec, options) + } override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = { assert(table.tableType != CatalogTableType.VIEW) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index a220fd334a5ce..8f8846b89f3f3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -29,8 +29,9 @@ import org.apache.spark.SparkException import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute +import org.apache.spark.sql.catalyst.analysis.{Resolver, UnresolvedAttribute} import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogUtils} +import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, TypeUtils} import org.apache.spark.sql.connector.catalog.TableProvider @@ -519,18 +520,8 @@ case class DataSource( case format: FileFormat => disallowWritingIntervals(outputColumns.map(_.dataType), forbidAnsiIntervals = false) val cmd = planForWritingFileFormat(format, mode, data) - val resolvedPartCols = cmd.partitionColumns.map { col => - // The partition columns created in `planForWritingFileFormat` should always be - // `UnresolvedAttribute` with a single name part. - assert(col.isInstanceOf[UnresolvedAttribute]) - val unresolved = col.asInstanceOf[UnresolvedAttribute] - assert(unresolved.nameParts.length == 1) - val name = unresolved.nameParts.head - outputColumns.find(a => equality(a.name, name)).getOrElse { - throw QueryCompilationErrors.cannotResolveAttributeError( - name, data.output.map(_.name).mkString(", ")) - } - } + val resolvedPartCols = + DataSource.resolvePartitionColumns(cmd.partitionColumns, outputColumns, data, equality) val resolved = cmd.copy( partitionColumns = resolvedPartCols, outputColumnNames = outputColumnNames) @@ -836,4 +827,26 @@ object DataSource extends Logging { throw QueryCompilationErrors.writeEmptySchemasUnsupportedByDataSourceError() } } + + /** + * Resolve partition columns using output columns of the query plan. + */ + def resolvePartitionColumns( + partitionColumns: Seq[Attribute], + outputColumns: Seq[Attribute], + plan: LogicalPlan, + resolver: Resolver): Seq[Attribute] = { + partitionColumns.map { col => + // The partition columns created in `planForWritingFileFormat` should always be + // `UnresolvedAttribute` with a single name part. + assert(col.isInstanceOf[UnresolvedAttribute]) + val unresolved = col.asInstanceOf[UnresolvedAttribute] + assert(unresolved.nameParts.length == 1) + val name = unresolved.nameParts.head + outputColumns.find(a => resolver(a.name, name)).getOrElse { + throw QueryCompilationErrors.cannotResolveAttributeError( + name, plan.output.map(_.name).mkString(", ")) + } + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index e75041b7fb0b2..59ba7c592eb5c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -36,7 +36,6 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReferences import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} -import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.{ProjectExec, SortExec, SparkPlan, SQLExecution, UnsafeExternalRowSorter} @@ -126,38 +125,8 @@ object FileFormatWriter extends Logging { } val empty2NullPlan = if (needConvert) ProjectExec(projectList, plan) else plan - val writerBucketSpec = bucketSpec.map { spec => - val bucketColumns = spec.bucketColumnNames.map(c => dataColumns.find(_.name == c).get) - - if (options.getOrElse(BucketingUtils.optionForHiveCompatibleBucketWrite, "false") == - "true") { - // Hive bucketed table: use `HiveHash` and bitwise-and as bucket id expression. - // Without the extra bitwise-and operation, we can get wrong bucket id when hash value of - // columns is negative. See Hive implementation in - // `org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils#getBucketNumber()`. - val hashId = BitwiseAnd(HiveHash(bucketColumns), Literal(Int.MaxValue)) - val bucketIdExpression = Pmod(hashId, Literal(spec.numBuckets)) - - // The bucket file name prefix is following Hive, Presto and Trino conversion, so this - // makes sure Hive bucketed table written by Spark, can be read by other SQL engines. - // - // Hive: `org.apache.hadoop.hive.ql.exec.Utilities#getBucketIdFromFile()`. - // Trino: `io.trino.plugin.hive.BackgroundHiveSplitLoader#BUCKET_PATTERNS`. - val fileNamePrefix = (bucketId: Int) => f"$bucketId%05d_0_" - WriterBucketSpec(bucketIdExpression, fileNamePrefix) - } else { - // Spark bucketed table: use `HashPartitioning.partitionIdExpression` as bucket id - // expression, so that we can guarantee the data distribution is same between shuffle and - // bucketed data source, which enables us to only shuffle one side when join a bucketed - // table and a normal one. - val bucketIdExpression = HashPartitioning(bucketColumns, spec.numBuckets) - .partitionIdExpression - WriterBucketSpec(bucketIdExpression, (_: Int) => "") - } - } - val sortColumns = bucketSpec.toSeq.flatMap { - spec => spec.sortColumnNames.map(c => dataColumns.find(_.name == c).get) - } + val writerBucketSpec = V1WritesUtils.getWriterBucketSpec(bucketSpec, dataColumns, options) + val sortColumns = V1WritesUtils.getBucketSortColumns(bucketSpec, dataColumns) val caseInsensitiveOptions = CaseInsensitiveMap(options) @@ -211,6 +180,7 @@ object FileFormatWriter extends Logging { try { val (rdd, concurrentOutputWriterSpec) = if (orderingMatched) { + logInfo(s"Output ordering is matched for write job ${description.uuid}") (empty2NullPlan.execute(), None) } else { // SPARK-21165: the `requiredOrdering` is based on the attributes from analyzed plan, and diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index d773d4bd271b3..e20d9ed8b537a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTablePartition} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils._ -import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} @@ -57,7 +57,7 @@ case class InsertIntoHadoopFsRelationCommand( catalogTable: Option[CatalogTable], fileIndex: Option[FileIndex], outputColumnNames: Seq[String]) - extends DataWritingCommand { + extends V1WriteCommand { private lazy val parameters = CaseInsensitiveMap(options) @@ -74,6 +74,9 @@ case class InsertIntoHadoopFsRelationCommand( staticPartitions.size < partitionColumns.length } + override def requiredOrdering: Seq[SortOrder] = + V1WritesUtils.getSortOrder(outputColumns, partitionColumns, bucketSpec, options) + override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = { // Most formats don't do well with duplicate columns, so lets not allow that SchemaUtils.checkColumnNameDuplication( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala new file mode 100644 index 0000000000000..bb202f48f29fd --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import org.apache.spark.sql.catalyst.SQLConfHelper +import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, AttributeSet, BitwiseAnd, HiveHash, Literal, Pmod, SortOrder} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Sort} +import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.command.DataWritingCommand +import org.apache.spark.sql.internal.SQLConf + +trait V1WriteCommand extends DataWritingCommand { + // Specify the required ordering for the V1 write command. `FileFormatWriter` will + // add SortExec if necessary when the requiredOrdering is empty. + def requiredOrdering: Seq[SortOrder] +} + +/** + * A rule that adds logical sorts to V1 data writing commands. + */ +object V1Writes extends Rule[LogicalPlan] with SQLConfHelper { + override def apply(plan: LogicalPlan): LogicalPlan = { + if (conf.plannedWriteEnabled) { + plan.transformDown { + case write: V1WriteCommand => + val newQuery = prepareQuery(write, write.query) + write.withNewChildren(newQuery :: Nil) + } + } else { + plan + } + } + + private def prepareQuery(write: V1WriteCommand, query: LogicalPlan): LogicalPlan = { + val requiredOrdering = write.requiredOrdering + val outputOrdering = query.outputOrdering + val orderingMatched = if (requiredOrdering.length > outputOrdering.length) { + false + } else { + requiredOrdering.zip(outputOrdering).forall { + case (requiredOrder, outputOrder) => requiredOrder.semanticEquals(outputOrder) + } + } + if (orderingMatched) { + query + } else { + // TODO: check if we need Empty2Null conversion before sort. + Sort(requiredOrdering, global = false, query) + } + } +} + +object V1WritesUtils { + + def getWriterBucketSpec( + bucketSpec: Option[BucketSpec], + dataColumns: Seq[Attribute], + options: Map[String, String]): Option[WriterBucketSpec] = { + bucketSpec.map { spec => + val bucketColumns = spec.bucketColumnNames.map(c => dataColumns.find(_.name == c).get) + + if (options.getOrElse(BucketingUtils.optionForHiveCompatibleBucketWrite, "false") == + "true") { + // Hive bucketed table: use `HiveHash` and bitwise-and as bucket id expression. + // Without the extra bitwise-and operation, we can get wrong bucket id when hash value of + // columns is negative. See Hive implementation in + // `org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils#getBucketNumber()`. + val hashId = BitwiseAnd(HiveHash(bucketColumns), Literal(Int.MaxValue)) + val bucketIdExpression = Pmod(hashId, Literal(spec.numBuckets)) + + // The bucket file name prefix is following Hive, Presto and Trino conversion, so this + // makes sure Hive bucketed table written by Spark, can be read by other SQL engines. + // + // Hive: `org.apache.hadoop.hive.ql.exec.Utilities#getBucketIdFromFile()`. + // Trino: `io.trino.plugin.hive.BackgroundHiveSplitLoader#BUCKET_PATTERNS`. + val fileNamePrefix = (bucketId: Int) => f"$bucketId%05d_0_" + WriterBucketSpec(bucketIdExpression, fileNamePrefix) + } else { + // Spark bucketed table: use `HashPartitioning.partitionIdExpression` as bucket id + // expression, so that we can guarantee the data distribution is same between shuffle and + // bucketed data source, which enables us to only shuffle one side when join a bucketed + // table and a normal one. + val bucketIdExpression = HashPartitioning(bucketColumns, spec.numBuckets) + .partitionIdExpression + WriterBucketSpec(bucketIdExpression, (_: Int) => "") + } + } + } + + def getBucketSortColumns( + bucketSpec: Option[BucketSpec], + dataColumns: Seq[Attribute]): Seq[Attribute] = { + bucketSpec.toSeq.flatMap { + spec => spec.sortColumnNames.map(c => dataColumns.find(_.name == c).get) + } + } + + def getSortOrder( + outputColumns: Seq[Attribute], + partitionColumns: Seq[Attribute], + bucketSpec: Option[BucketSpec], + options: Map[String, String]): Seq[SortOrder] = { + val partitionSet = AttributeSet(partitionColumns) + val dataColumns = outputColumns.filterNot(partitionSet.contains) + val writerBucketSpec = V1WritesUtils.getWriterBucketSpec(bucketSpec, dataColumns, options) + val sortColumns = V1WritesUtils.getBucketSortColumns(bucketSpec, dataColumns) + + if (SQLConf.get.maxConcurrentOutputFileWriters > 0 && sortColumns.isEmpty) { + // Do not insert logical sort when concurrent output writers are enabled. + Seq.empty + } else { + // We should first sort by partition columns, then bucket id, and finally sorting columns. + (partitionColumns ++ writerBucketSpec.map(_.bucketIdExpression) ++ sortColumns) + .map(SortOrder(_, Ascending)) + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala new file mode 100644 index 0000000000000..aa84184392709 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import org.apache.logging.log4j.Level + +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils} + +abstract class V1WriteCommandSuiteBase extends QueryTest with SQLTestUtils { + + import testImplicits._ + + protected override def beforeAll(): Unit = { + super.beforeAll() + (0 to 20).map(i => (i, i % 5, (i % 10).toString)) + .toDF("i", "j", "k") + .write + .saveAsTable("t0") + } + + protected override def afterAll(): Unit = { + sql("drop table if exists t0") + super.afterAll() + } + + protected def checkOrdering(logAppender: LogAppender, matched: Boolean): Unit = { + assert(logAppender.loggingEvents.exists { event => + event.getLevel.equals(Level.INFO) && + event.getMessage.getFormattedMessage.contains("Output ordering is matched") + } == matched) + } +} + +class V1WriteCommandSuite extends V1WriteCommandSuiteBase with SharedSparkSession { + test("FileFormatWriter should log when ordering is matched") { + Seq(true, false).foreach { enabled => + withSQLConf(SQLConf.PLANNED_WRITE_ENABLED.key -> enabled.toString) { + withTable("t1", "t2") { + val logAppender1 = new LogAppender() + withLogAppender(logAppender1) { + sql("CREATE TABLE t1 USING PARQUET AS SELECT * FROM t0") + checkOrdering(logAppender1, matched = true) + } + val logAppender2 = new LogAppender() + // Case 2: query is already sorted. + withLogAppender(logAppender2) { + sql("CREATE TABLE t2 USING PARQUET PARTITIONED BY (k) " + + "AS SELECT * FROM t0 ORDER BY k") + // TODO: in this case the executed plan of the command's query will be an AQE plan, + // and FileFormatWriter is not able to detect the actual ordering of the AQE plan. + checkOrdering(logAppender2, matched = false) + } + } + } + } + } + + test("create table with partition columns") { + Seq(true, false).foreach { enabled => + withSQLConf(SQLConf.PLANNED_WRITE_ENABLED.key -> enabled.toString) { + withTable("t") { + val logAppender = new LogAppender("create table") + withLogAppender(logAppender) { + sql("CREATE TABLE t USING PARQUET PARTITIONED BY (k) AS SELECT * FROM t0") + checkOrdering(logAppender, matched = enabled) + } + } + } + } + } + + test("insert into table with partition, bucketed and sort columns") { + Seq(true, false).foreach { enabled => + withSQLConf(SQLConf.PLANNED_WRITE_ENABLED.key -> enabled.toString) { + withTable("t") { + sql( + """ + |CREATE TABLE t(i INT, j INT) USING PARQUET + |PARTITIONED BY (k STRING) + |CLUSTERED BY (i, j) SORTED BY (j) INTO 2 BUCKETS + |""".stripMargin) + val logAppender = new LogAppender("insert into") + withLogAppender(logAppender) { + sql("INSERT INTO t SELECT * FROM t0") + checkOrdering(logAppender, matched = enabled) + } + } + } + } + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala index 96b41dd8e35fa..55644e6a341fd 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala @@ -21,16 +21,17 @@ import scala.util.control.NonFatal import org.apache.spark.sql.{Row, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SessionCatalog} +import org.apache.spark.sql.catalyst.expressions.SortOrder import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command.{DataWritingCommand, DDLUtils} -import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InsertIntoHadoopFsRelationCommand, LogicalRelation} +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InsertIntoHadoopFsRelationCommand, LogicalRelation, V1WriteCommand, V1WritesUtils} import org.apache.spark.sql.hive.HiveSessionCatalog import org.apache.spark.util.Utils -trait CreateHiveTableAsSelectBase extends DataWritingCommand { +trait CreateHiveTableAsSelectBase extends V1WriteCommand with V1WritesHiveUtils { val tableDesc: CatalogTable val query: LogicalPlan val outputColumnNames: Seq[String] @@ -38,6 +39,21 @@ trait CreateHiveTableAsSelectBase extends DataWritingCommand { protected val tableIdentifier = tableDesc.identifier + override def requiredOrdering: Seq[SortOrder] = { + // If the table does not exist the schema should always be empty. + val table = if (tableDesc.schema.isEmpty) { + val tableSchema = CharVarcharUtils.getRawSchema(outputColumns.toStructType, conf) + tableDesc.copy(schema = tableSchema) + } else { + tableDesc + } + // For CTAS, there is no static partition values to insert. + val partition = tableDesc.partitionColumnNames.map(_ -> None).toMap + val partitionColumns = getDynamicPartitionColumns(table, partition, query) + val options = getOptionsWithHiveBucketWrite(tableDesc.bucketSpec) + V1WritesUtils.getSortOrder(outputColumns, partitionColumns, tableDesc.bucketSpec, options) + } + override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val tableExists = catalog.tableExists(tableIdentifier) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 8fca95130dd8f..dcaeac63fb2b3 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -17,23 +17,20 @@ package org.apache.spark.sql.hive.execution -import java.util.Locale - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.conf.HiveConf -import org.apache.hadoop.hive.ql.ErrorMsg import org.apache.hadoop.hive.ql.plan.TableDesc -import org.apache.spark.SparkException -import org.apache.spark.sql.{AnalysisException, Row, SparkSession} +import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.expressions.SortOrder import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap -import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} +import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command.CommandUtils +import org.apache.spark.sql.execution.datasources.{V1WriteCommand, V1WritesUtils} import org.apache.spark.sql.hive.HiveExternalCatalog import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} import org.apache.spark.sql.hive.client.HiveClientImpl @@ -76,7 +73,14 @@ case class InsertIntoHiveTable( query: LogicalPlan, overwrite: Boolean, ifPartitionNotExists: Boolean, - outputColumnNames: Seq[String]) extends SaveAsHiveFile { + outputColumnNames: Seq[String] + ) extends SaveAsHiveFile with V1WriteCommand with V1WritesHiveUtils { + + override def requiredOrdering: Seq[SortOrder] = { + val partitionColumns = getDynamicPartitionColumns(table, partition, query) + val options = getOptionsWithHiveBucketWrite(table.bucketSpec) + V1WritesUtils.getSortOrder(outputColumns, partitionColumns, table.bucketSpec, options) + } /** * Inserts all the rows in the table into Hive. Row objects are properly serialized with the @@ -133,53 +137,8 @@ case class InsertIntoHiveTable( val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false) val numDynamicPartitions = partition.values.count(_.isEmpty) - val numStaticPartitions = partition.values.count(_.nonEmpty) - val partitionSpec = partition.map { - case (key, Some(null)) => key -> ExternalCatalogUtils.DEFAULT_PARTITION_NAME - case (key, Some(value)) => key -> value - case (key, None) => key -> "" - } - - // All partition column names in the format of "//..." - val partitionColumns = fileSinkConf.getTableInfo.getProperties.getProperty("partition_columns") - val partitionColumnNames = Option(partitionColumns).map(_.split("/")).getOrElse(Array.empty) - - // By this time, the partition map must match the table's partition columns - if (partitionColumnNames.toSet != partition.keySet) { - throw QueryExecutionErrors.requestedPartitionsMismatchTablePartitionsError(table, partition) - } - - // Validate partition spec if there exist any dynamic partitions - if (numDynamicPartitions > 0) { - // Report error if dynamic partitioning is not enabled - if (!hadoopConf.get("hive.exec.dynamic.partition", "true").toBoolean) { - throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg) - } - - // Report error if dynamic partition strict mode is on but no static partition is found - if (numStaticPartitions == 0 && - hadoopConf.get("hive.exec.dynamic.partition.mode", "strict").equalsIgnoreCase("strict")) { - throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg) - } - - // Report error if any static partition appears after a dynamic partition - val isDynamic = partitionColumnNames.map(partitionSpec(_).isEmpty) - if (isDynamic.init.zip(isDynamic.tail).contains((true, false))) { - throw new AnalysisException(ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg) - } - } - - val partitionAttributes = partitionColumnNames.takeRight(numDynamicPartitions).map { name => - val attr = query.resolve(name :: Nil, sparkSession.sessionState.analyzer.resolver).getOrElse { - throw QueryCompilationErrors.cannotResolveAttributeError( - name, query.output.map(_.name).mkString(", ")) - }.asInstanceOf[Attribute] - // SPARK-28054: Hive metastore is not case preserving and keeps partition columns - // with lower cased names. Hive will validate the column names in the partition directories - // during `loadDynamicPartitions`. Spark needs to write partition directories with lower-cased - // column names in order to make `loadDynamicPartitions` work. - attr.withName(name.toLowerCase(Locale.ROOT)) - } + val partitionSpec = getPartitionSpec(partition) + val partitionAttributes = getDynamicPartitionColumns(table, partition, query) val writtenParts = saveAsHiveFile( sparkSession = sparkSession, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala index 7f885729bd2be..799cea42e1e8a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala @@ -37,13 +37,13 @@ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command.DataWritingCommand -import org.apache.spark.sql.execution.datasources.{BucketingUtils, FileFormatWriter} +import org.apache.spark.sql.execution.datasources.FileFormatWriter import org.apache.spark.sql.hive.HiveExternalCatalog import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} import org.apache.spark.sql.hive.client.HiveVersion // Base trait from which all hive insert statement physical execution extends. -private[hive] trait SaveAsHiveFile extends DataWritingCommand { +private[hive] trait SaveAsHiveFile extends DataWritingCommand with V1WritesHiveUtils { var createdTempDir: Option[Path] = None @@ -86,9 +86,7 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand { jobId = java.util.UUID.randomUUID().toString, outputPath = outputLocation) - val options = bucketSpec - .map(_ => Map(BucketingUtils.optionForHiveCompatibleBucketWrite -> "true")) - .getOrElse(Map.empty) + val options = getOptionsWithHiveBucketWrite(bucketSpec) FileFormatWriter.write( sparkSession = sparkSession, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/V1WritesHiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/V1WritesHiveUtils.scala new file mode 100644 index 0000000000000..752753f334a23 --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/V1WritesHiveUtils.scala @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import java.util.Locale + +import org.apache.hadoop.hive.ql.ErrorMsg +import org.apache.hadoop.hive.ql.plan.TableDesc + +import org.apache.spark.SparkException +import org.apache.spark.sql.{AnalysisException, SparkSession} +import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, ExternalCatalogUtils} +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} +import org.apache.spark.sql.execution.datasources.BucketingUtils +import org.apache.spark.sql.hive.client.HiveClientImpl + +trait V1WritesHiveUtils { + def getPartitionSpec(partition: Map[String, Option[String]]): Map[String, String] = { + partition.map { + case (key, Some(null)) => key -> ExternalCatalogUtils.DEFAULT_PARTITION_NAME + case (key, Some(value)) => key -> value + case (key, None) => key -> "" + } + } + + def getDynamicPartitionColumns( + table: CatalogTable, + partition: Map[String, Option[String]], + query: LogicalPlan): Seq[Attribute] = { + val numDynamicPartitions = partition.values.count(_.isEmpty) + val numStaticPartitions = partition.values.count(_.nonEmpty) + val partitionSpec = getPartitionSpec(partition) + + val hiveQlTable = HiveClientImpl.toHiveTable(table) + val tableDesc = new TableDesc( + hiveQlTable.getInputFormatClass, + hiveQlTable.getOutputFormatClass, + hiveQlTable.getMetadata + ) + + // All partition column names in the format of "//..." + val partitionColumns = tableDesc.getProperties.getProperty("partition_columns") + val partitionColumnNames = Option(partitionColumns).map(_.split("/")).getOrElse(Array.empty) + + // By this time, the partition map must match the table's partition columns + if (partitionColumnNames.toSet != partition.keySet) { + throw QueryExecutionErrors.requestedPartitionsMismatchTablePartitionsError(table, partition) + } + + val sessionState = SparkSession.active.sessionState + val hadoopConf = sessionState.newHadoopConf() + + // Validate partition spec if there exist any dynamic partitions + if (numDynamicPartitions > 0) { + // Report error if dynamic partitioning is not enabled + if (!hadoopConf.get("hive.exec.dynamic.partition", "true").toBoolean) { + throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg) + } + + // Report error if dynamic partition strict mode is on but no static partition is found + if (numStaticPartitions == 0 && + hadoopConf.get("hive.exec.dynamic.partition.mode", "strict").equalsIgnoreCase("strict")) { + throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg) + } + + // Report error if any static partition appears after a dynamic partition + val isDynamic = partitionColumnNames.map(partitionSpec(_).isEmpty) + if (isDynamic.init.zip(isDynamic.tail).contains((true, false))) { + throw new AnalysisException(ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg) + } + } + + partitionColumnNames.takeRight(numDynamicPartitions).map { name => + val attr = query.resolve(name :: Nil, sessionState.analyzer.resolver).getOrElse { + throw QueryCompilationErrors.cannotResolveAttributeError( + name, query.output.map(_.name).mkString(", ")) + }.asInstanceOf[Attribute] + // SPARK-28054: Hive metastore is not case preserving and keeps partition columns + // with lower cased names. Hive will validate the column names in the partition directories + // during `loadDynamicPartitions`. Spark needs to write partition directories with lower-cased + // column names in order to make `loadDynamicPartitions` work. + attr.withName(name.toLowerCase(Locale.ROOT)) + } + } + + def getOptionsWithHiveBucketWrite(bucketSpec: Option[BucketSpec]): Map[String, String] = { + bucketSpec + .map(_ => Map(BucketingUtils.optionForHiveCompatibleBucketWrite -> "true")) + .getOrElse(Map.empty) + } +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/V1WriteHiveCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/V1WriteHiveCommandSuite.scala new file mode 100644 index 0000000000000..877cd7b2b03a7 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/V1WriteHiveCommandSuite.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution.command + +import org.apache.spark.sql.execution.datasources.V1WriteCommandSuiteBase +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.internal.SQLConf + +class V1WriteHiveCommandSuite extends V1WriteCommandSuiteBase with TestHiveSingleton { + + test("create hive table as select") { + Seq(true, false).foreach { enabled => + withSQLConf( + SQLConf.PLANNED_WRITE_ENABLED.key -> enabled.toString, + "hive.exec.dynamic.partition.mode" -> "nonstrict") { + withTable("t") { + val logAppender = new LogAppender("create hive table") + withLogAppender(logAppender) { + sql( + """ + |CREATE TABLE t + |STORED AS PARQUET + |PARTITIONED BY (k) + |AS SELECT * FROM t0 + |""".stripMargin) + checkOrdering(logAppender, matched = enabled) + } + } + } + } + } + + test("insert into hive table") { + Seq(true, false).foreach { enabled => + withSQLConf( + SQLConf.PLANNED_WRITE_ENABLED.key -> enabled.toString, + "hive.exec.dynamic.partition.mode" -> "nonstrict") { + withTable("t") { + sql( + """ + |CREATE TABLE t (i INT, j INT) + |STORED AS PARQUET + |PARTITIONED BY (k STRING) + |CLUSTERED BY (i, j) SORTED BY (j) INTO 2 BUCKETS + |""".stripMargin) + + val logAppender = new LogAppender("insert into hive table") + withLogAppender(logAppender) { + sql("INSERT OVERWRITE t SELECT * FROM t0") + checkOrdering(logAppender, matched = enabled) + } + } + } + } + } +} From eab24ab1e368ee533ef89450751575e781926852 Mon Sep 17 00:00:00 2001 From: allisonwang-db Date: Fri, 8 Jul 2022 16:34:23 -0700 Subject: [PATCH 2/6] add aqe ordering and more tests --- .../apache/spark/sql/internal/SQLConf.scala | 16 +- .../adaptive/AdaptiveSparkPlanExec.scala | 9 +- .../command/createDataSourceTables.scala | 1 - .../datasources/FileFormatWriter.scala | 32 +-- .../sql/execution/datasources/V1Writes.scala | 40 +++- .../datasources/V1WriteCommandSuite.scala | 212 ++++++++++++++---- .../command/V1WriteHiveCommandSuite.scala | 40 ++-- 7 files changed, 243 insertions(+), 107 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 72720f329a1a8..967775ba43dcf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -412,6 +412,14 @@ object SQLConf { .longConf .createWithDefault(67108864L) + val PLANNED_WRITE_ENABLED = buildConf("spark.sql.optimizer.plannedWrite.enabled") + .internal() + .doc("When set to true, Spark optimizer will add logical sort operators to V1 write commands " + + "if needed so that `FileFormatWriter` does not need to insert physical sorts.") + .version("3.4.0") + .booleanConf + .createWithDefault(false) + val COMPRESS_CACHED = buildConf("spark.sql.inMemoryColumnarStorage.compressed") .doc("When set to true Spark SQL will automatically select a compression codec for each " + "column based on statistics of the data.") @@ -3781,14 +3789,6 @@ object SQLConf { .intConf .createWithDefault(0) - val PLANNED_WRITE_ENABLED = buildConf("spark.sql.plannedWrite.enabled") - .internal() - .doc("When set to true, Spark adds logical sorts to V1 write commands if needed so that " + - "`FileFormatWriter` does not need to insert physical sorts.") - .version("3.2.0") - .booleanConf - .createWithDefault(false) - val INFER_NESTED_DICT_AS_STRUCT = buildConf("spark.sql.pyspark.inferNestedDictAsStruct.enabled") .doc("PySpark's SparkSession.createDataFrame infers the nested dict as a map by default. " + "When it set to true, it infers the nested dict as a struct.") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index 44a5ba4a547d9..6c34c3f999d55 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -30,7 +30,7 @@ import org.apache.spark.broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer} import org.apache.spark.sql.catalyst.plans.physical.{Distribution, UnspecifiedDistribution} import org.apache.spark.sql.catalyst.rules.{PlanChangeLogger, Rule} @@ -215,6 +215,13 @@ case class AdaptiveSparkPlanExec( executedPlan.resetMetrics() } + override def outputOrdering: Seq[SortOrder] = { + // AQE will not change the output ordering if it is defined in the logical plan. + // Use inputPlan's logicalLink here in case some top level physical nodes may be + // removed in `initialPlan`. + inputPlan.logicalLink.map(_.outputOrdering).getOrElse(Nil) + } + private def getExecutionId: Option[Long] = { // If the `QueryExecution` does not match the current execution ID, it means the execution ID // belongs to another (parent) query, and we should not call update UI in this query. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 7847727ccf841..50ad55d563338 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -152,7 +152,6 @@ case class CreateDataSourceTableAsSelectCommand( outputColumns, query, SparkSession.active.sessionState.conf.resolver) - // We do not need the path option from the table location to get writer bucket spec. val options = table.storage.properties V1WritesUtils.getSortOrder(outputColumns, partitionColumns, table.bucketSpec, options) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 59ba7c592eb5c..9542e82dae1f8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -35,13 +35,11 @@ import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReferences -import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.{ProjectExec, SortExec, SparkPlan, SQLExecution, UnsafeExternalRowSorter} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StringType -import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.{SerializableConfiguration, Utils} @@ -53,25 +51,6 @@ object FileFormatWriter extends Logging { customPartitionLocations: Map[TablePartitionSpec, String], outputColumns: Seq[Attribute]) - /** A function that converts the empty string to null for partition values. */ - case class Empty2Null(child: Expression) extends UnaryExpression with String2StringExpression { - override def convert(v: UTF8String): UTF8String = if (v.numBytes() == 0) null else v - override def nullable: Boolean = true - override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { - nullSafeCodeGen(ctx, ev, c => { - s"""if ($c.numBytes() == 0) { - | ${ev.isNull} = true; - | ${ev.value} = null; - |} else { - | ${ev.value} = $c; - |}""".stripMargin - }) - } - - override protected def withNewChildInternal(newChild: Expression): Empty2Null = - copy(child = newChild) - } - /** Describes how concurrent output writers should be executed. */ case class ConcurrentOutputWriterSpec( maxWriters: Int, @@ -120,7 +99,7 @@ object FileFormatWriter extends Logging { val projectList: Seq[NamedExpression] = plan.output.map { case p if partitionSet.contains(p) && p.dataType == StringType && p.nullable => needConvert = true - Alias(Empty2Null(p), p.name)() + Alias(V1WritesUtils.Empty2Null(p), p.name)() case attr => attr } val empty2NullPlan = if (needConvert) ProjectExec(projectList, plan) else plan @@ -180,7 +159,14 @@ object FileFormatWriter extends Logging { try { val (rdd, concurrentOutputWriterSpec) = if (orderingMatched) { - logInfo(s"Output ordering is matched for write job ${description.uuid}") + // When planned write is enabled, the optimizer rule V1Writes will add logical sort + // operator based on the required ordering of the V1 write command. So the output + // ordering of the physical plan should always match the required ordering. + // There are two cases where FileFormatWriter still needs to add physical sort: + // 1) When the planned write config is disabled. + // 2) When the concurrent writers are enabled (in this case the required ordering of a + // V1 write command will be empty). + logInfo(s"Output ordering is matched for write job ${description.uuid}.") (empty2NullPlan.execute(), None) } else { // SPARK-21165: the `requiredOrdering` is based on the attributes from analyzed plan, and diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala index bb202f48f29fd..c5e57b5250fd2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala @@ -19,12 +19,14 @@ package org.apache.spark.sql.execution.datasources import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.catalyst.catalog.BucketSpec -import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, AttributeSet, BitwiseAnd, HiveHash, Literal, Pmod, SortOrder} +import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, AttributeSet, BitwiseAnd, Expression, HiveHash, Literal, Pmod, SortOrder, String2StringExpression, UnaryExpression} +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Sort} import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.command.DataWritingCommand import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.unsafe.types.UTF8String trait V1WriteCommand extends DataWritingCommand { // Specify the required ordering for the V1 write command. `FileFormatWriter` will @@ -51,6 +53,8 @@ object V1Writes extends Rule[LogicalPlan] with SQLConfHelper { private def prepareQuery(write: V1WriteCommand, query: LogicalPlan): LogicalPlan = { val requiredOrdering = write.requiredOrdering val outputOrdering = query.outputOrdering + // Check if the ordering is already matched. It is needed to ensure the + // idempotency of the rule. val orderingMatched = if (requiredOrdering.length > outputOrdering.length) { false } else { @@ -61,7 +65,6 @@ object V1Writes extends Rule[LogicalPlan] with SQLConfHelper { if (orderingMatched) { query } else { - // TODO: check if we need Empty2Null conversion before sort. Sort(requiredOrdering, global = false, query) } } @@ -69,6 +72,25 @@ object V1Writes extends Rule[LogicalPlan] with SQLConfHelper { object V1WritesUtils { + /** A function that converts the empty string to null for partition values. */ + case class Empty2Null(child: Expression) extends UnaryExpression with String2StringExpression { + override def convert(v: UTF8String): UTF8String = if (v.numBytes() == 0) null else v + override def nullable: Boolean = true + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + nullSafeCodeGen(ctx, ev, c => { + s"""if ($c.numBytes() == 0) { + | ${ev.isNull} = true; + | ${ev.value} = null; + |} else { + | ${ev.value} = $c; + |}""".stripMargin + }) + } + + override protected def withNewChildInternal(newChild: Expression): Empty2Null = + copy(child = newChild) + } + def getWriterBucketSpec( bucketSpec: Option[BucketSpec], dataColumns: Seq[Attribute], @@ -127,8 +149,18 @@ object V1WritesUtils { Seq.empty } else { // We should first sort by partition columns, then bucket id, and finally sorting columns. - (partitionColumns ++ writerBucketSpec.map(_.bucketIdExpression) ++ sortColumns) - .map(SortOrder(_, Ascending)) + val requiredOrdering = + partitionColumns ++writerBucketSpec.map(_.bucketIdExpression) ++ sortColumns + + // Convert empty string partition columns to null when sorting the columns. + // TODO: this will cause output ordering mismatch in FileFormatWriter. + // requiredOrdering.map { + // case a: Attribute if partitionSet.contains(a) && a.dataType == StringType && a.nullable => + // Empty2Null(a) + // case o => o + // }.map(SortOrder(_, Ascending)) + + requiredOrdering.map(SortOrder(_, Ascending)) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala index aa84184392709..9c78c8ea448c3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala @@ -19,14 +19,19 @@ package org.apache.spark.sql.execution.datasources import org.apache.logging.log4j.Level -import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Sort} +import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils} +import org.apache.spark.sql.util.QueryExecutionListener abstract class V1WriteCommandSuiteBase extends QueryTest with SQLTestUtils { import testImplicits._ + setupTestData() + protected override def beforeAll(): Unit = { super.beforeAll() (0 to 20).map(i => (i, i % 5, (i % 10).toString)) @@ -40,66 +45,183 @@ abstract class V1WriteCommandSuiteBase extends QueryTest with SQLTestUtils { super.afterAll() } - protected def checkOrdering(logAppender: LogAppender, matched: Boolean): Unit = { + protected def withPlannedWrite(testFunc: Boolean => Any): Unit = { + Seq(true, false).foreach { enabled => + withSQLConf(SQLConf.PLANNED_WRITE_ENABLED.key -> enabled.toString) { + testFunc(enabled) + } + } + } + + // Execute a write query and check ordering of the plan. + protected def executeAndCheckOrdering( + hasLogicalSort: Boolean, orderingMatched: Boolean)(query: => Unit): Unit = { + var optimizedPlan: LogicalPlan = null + + val listener = new QueryExecutionListener { + override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { + qe.optimizedPlan match { + case w: V1WriteCommand => + optimizedPlan = w.query + case _ => + } + } + override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {} + } + spark.listenerManager.register(listener) + + val logAppender = new LogAppender("v1 write") + + withLogAppender(logAppender) { + query + } + + // Check if the ordering is matched before FileFormatWriter execute the plan. + // Note, if we add empty2null in logical sort order, the output ordering will + // not match for string columns. + // For example: + // Project [i#19, j#20, empty2null(k#21) AS k#26] + // +- *(1) Sort [empty2null(k#21) ASC NULLS FIRST], false, 0 + // +- *(1) ColumnarToRow + // +- FileScan parquet spark_catalog.default.t0[i#19,j#20,k#21] + // Here the required ordering is `k#21` but the actual ordering is `k#26` due to the + // AliasAwareOutputOrdering trait of ProjectExec. + // One solution is to use the command query's output ordering instead of empty2null. assert(logAppender.loggingEvents.exists { event => event.getLevel.equals(Level.INFO) && - event.getMessage.getFormattedMessage.contains("Output ordering is matched") - } == matched) + event.getMessage.getFormattedMessage.contains("Output ordering is matched") + } === orderingMatched, "FileFormatWriter output ordering does not match") + + sparkContext.listenerBus.waitUntilEmpty() + + // Check whether a logical sort node is at the top of the logical plan of the write query. + if (optimizedPlan != null) { + assert(optimizedPlan.isInstanceOf[Sort] === hasLogicalSort) + } + + spark.listenerManager.register(listener) } } class V1WriteCommandSuite extends V1WriteCommandSuiteBase with SharedSparkSession { - test("FileFormatWriter should log when ordering is matched") { - Seq(true, false).foreach { enabled => - withSQLConf(SQLConf.PLANNED_WRITE_ENABLED.key -> enabled.toString) { - withTable("t1", "t2") { - val logAppender1 = new LogAppender() - withLogAppender(logAppender1) { - sql("CREATE TABLE t1 USING PARQUET AS SELECT * FROM t0") - checkOrdering(logAppender1, matched = true) - } - val logAppender2 = new LogAppender() - // Case 2: query is already sorted. - withLogAppender(logAppender2) { - sql("CREATE TABLE t2 USING PARQUET PARTITIONED BY (k) " + - "AS SELECT * FROM t0 ORDER BY k") - // TODO: in this case the executed plan of the command's query will be an AQE plan, - // and FileFormatWriter is not able to detect the actual ordering of the AQE plan. - checkOrdering(logAppender2, matched = false) - } + + import testImplicits._ + + test("v1 write without partition columns") { + withPlannedWrite { enabled => + withTable("t") { + executeAndCheckOrdering(hasLogicalSort = false, orderingMatched = true) { + sql("CREATE TABLE t USING PARQUET AS SELECT * FROM t0") } } } } - test("create table with partition columns") { - Seq(true, false).foreach { enabled => - withSQLConf(SQLConf.PLANNED_WRITE_ENABLED.key -> enabled.toString) { - withTable("t") { - val logAppender = new LogAppender("create table") - withLogAppender(logAppender) { - sql("CREATE TABLE t USING PARQUET PARTITIONED BY (k) AS SELECT * FROM t0") - checkOrdering(logAppender, matched = enabled) - } + test("v1 write with non-string partition columns") { + withPlannedWrite { enabled => + withTable("t") { + executeAndCheckOrdering(hasLogicalSort = enabled, orderingMatched = enabled) { + sql("CREATE TABLE t USING PARQUET PARTITIONED BY (j) AS SELECT i, k, j FROM t0") } } } } - test("insert into table with partition, bucketed and sort columns") { - Seq(true, false).foreach { enabled => - withSQLConf(SQLConf.PLANNED_WRITE_ENABLED.key -> enabled.toString) { - withTable("t") { - sql( - """ - |CREATE TABLE t(i INT, j INT) USING PARQUET - |PARTITIONED BY (k STRING) - |CLUSTERED BY (i, j) SORTED BY (j) INTO 2 BUCKETS - |""".stripMargin) - val logAppender = new LogAppender("insert into") - withLogAppender(logAppender) { - sql("INSERT INTO t SELECT * FROM t0") - checkOrdering(logAppender, matched = enabled) + test("v1 write with string partition columns") { + withPlannedWrite { enabled => + withTable("t") { + executeAndCheckOrdering(hasLogicalSort = enabled, orderingMatched = enabled) { + sql("CREATE TABLE t USING PARQUET PARTITIONED BY (k) AS SELECT * FROM t0") + } + } + } + } + + test("v1 write with partition, bucketed and sort columns") { + withPlannedWrite { enabled => + withTable("t") { + sql( + """ + |CREATE TABLE t(i INT, j INT) USING PARQUET + |PARTITIONED BY (k STRING) + |CLUSTERED BY (i, j) SORTED BY (j) INTO 2 BUCKETS + |""".stripMargin) + executeAndCheckOrdering(hasLogicalSort = enabled, orderingMatched = enabled) { + sql("INSERT INTO t SELECT * FROM t0") + } + } + } + } + + test("v1 write with already sorted plan - non-string partition column") { + withPlannedWrite { enabled => + withTable("t") { + sql( + """ + |CREATE TABLE t(i INT, k STRING) USING PARQUET + |PARTITIONED BY (j INT) + |""".stripMargin) + executeAndCheckOrdering(hasLogicalSort = true, orderingMatched = true) { + sql("INSERT INTO t SELECT i, k, j FROM t0 ORDER BY j") + } + } + } + } + + test("v1 write with already sorted plan - string partition column") { + withPlannedWrite { enabled => + withTable("t") { + sql( + """ + |CREATE TABLE t(i INT, j INT) USING PARQUET + |PARTITIONED BY (k STRING) + |""".stripMargin) + executeAndCheckOrdering(hasLogicalSort = true, orderingMatched = true) { + sql("INSERT INTO t SELECT * FROM t0 ORDER BY k") + } + } + } + } + + test("v1 write with null and empty string column values") { + withPlannedWrite { enabled => + withTempPath { path => + executeAndCheckOrdering(hasLogicalSort = enabled, orderingMatched = enabled) { + Seq((0, None), (1, Some("")), (2, None), (3, Some("x"))) + .toDF("id", "p") + .write + .partitionBy("p") + .parquet(path.toString) + checkAnswer( + spark.read.parquet(path.toString).where("p IS NULL").sort($"id"), + Seq(Row(0, null), Row(1, null), Row(2, null))) + } + } + } + } + + test("v1 write with AQE changing SMJ to BHJ") { + withPlannedWrite { enabled => + withTable("t") { + sql( + """ + |CREATE TABLE t(key INT, value STRING) USING PARQUET + |PARTITIONED BY (a INT) + |""".stripMargin) + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") { + // The top level sort added by V1 write will be removed by the physical rule + // RemoveRedundantSorts initially, and during the execution AQE will change + // SMJ to BHJ which will remove the original output ordering from the SMJ. + // In this case AQE should still add back the sort node from the logical plan + // during re-planning, and ordering should be matched in FileFormatWriter. + executeAndCheckOrdering(hasLogicalSort = enabled, orderingMatched = enabled) { + sql( + """ + |INSERT INTO t + |SELECT key, value, a + |FROM testData JOIN testData2 ON key = a + |WHERE value = '1' + |""".stripMargin) } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/V1WriteHiveCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/V1WriteHiveCommandSuite.scala index 877cd7b2b03a7..4cc77067bd0c0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/V1WriteHiveCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/V1WriteHiveCommandSuite.scala @@ -19,18 +19,14 @@ package org.apache.spark.sql.hive.execution.command import org.apache.spark.sql.execution.datasources.V1WriteCommandSuiteBase import org.apache.spark.sql.hive.test.TestHiveSingleton -import org.apache.spark.sql.internal.SQLConf class V1WriteHiveCommandSuite extends V1WriteCommandSuiteBase with TestHiveSingleton { test("create hive table as select") { - Seq(true, false).foreach { enabled => - withSQLConf( - SQLConf.PLANNED_WRITE_ENABLED.key -> enabled.toString, - "hive.exec.dynamic.partition.mode" -> "nonstrict") { - withTable("t") { - val logAppender = new LogAppender("create hive table") - withLogAppender(logAppender) { + withPlannedWrite { enabled => + withTable("t") { + withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") { + executeAndCheckOrdering(hasLogicalSort = enabled, orderingMatched = enabled) { sql( """ |CREATE TABLE t @@ -38,7 +34,6 @@ class V1WriteHiveCommandSuite extends V1WriteCommandSuiteBase with TestHiveSingl |PARTITIONED BY (k) |AS SELECT * FROM t0 |""".stripMargin) - checkOrdering(logAppender, matched = enabled) } } } @@ -46,23 +41,18 @@ class V1WriteHiveCommandSuite extends V1WriteCommandSuiteBase with TestHiveSingl } test("insert into hive table") { - Seq(true, false).foreach { enabled => - withSQLConf( - SQLConf.PLANNED_WRITE_ENABLED.key -> enabled.toString, - "hive.exec.dynamic.partition.mode" -> "nonstrict") { - withTable("t") { - sql( - """ - |CREATE TABLE t (i INT, j INT) - |STORED AS PARQUET - |PARTITIONED BY (k STRING) - |CLUSTERED BY (i, j) SORTED BY (j) INTO 2 BUCKETS - |""".stripMargin) - - val logAppender = new LogAppender("insert into hive table") - withLogAppender(logAppender) { + withPlannedWrite { enabled => + withTable("t") { + sql( + """ + |CREATE TABLE t (i INT, j INT) + |STORED AS PARQUET + |PARTITIONED BY (k STRING) + |CLUSTERED BY (i, j) SORTED BY (j) INTO 2 BUCKETS + |""".stripMargin) + withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") { + executeAndCheckOrdering(hasLogicalSort = enabled, orderingMatched = enabled) { sql("INSERT OVERWRITE t SELECT * FROM t0") - checkOrdering(logAppender, matched = enabled) } } } From 021ff952ee64f82dc6b90dd112fc281cc7f08440 Mon Sep 17 00:00:00 2001 From: allisonwang-db Date: Tue, 12 Jul 2022 11:33:24 -0400 Subject: [PATCH 3/6] turn on config --- .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 967775ba43dcf..631c89d798fe1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -418,7 +418,7 @@ object SQLConf { "if needed so that `FileFormatWriter` does not need to insert physical sorts.") .version("3.4.0") .booleanConf - .createWithDefault(false) + .createWithDefault(true) val COMPRESS_CACHED = buildConf("spark.sql.inMemoryColumnarStorage.compressed") .doc("When set to true Spark SQL will automatically select a compression codec for each " + From 44662ed36b3d40089530d8e3b6b147a60a67f93a Mon Sep 17 00:00:00 2001 From: allisonwang-db Date: Mon, 18 Jul 2022 12:37:29 -0700 Subject: [PATCH 4/6] update tests --- .../datasources/FileFormatWriter.scala | 47 +++++++++++++++---- .../sql/execution/datasources/V1Writes.scala | 39 ++------------- .../datasources/V1WriteCommandSuite.scala | 26 ++-------- .../command/V1WriteHiveCommandSuite.scala | 15 ++++++ 4 files changed, 62 insertions(+), 65 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 9542e82dae1f8..7b79095373c85 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -35,11 +35,13 @@ import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReferences +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.{ProjectExec, SortExec, SparkPlan, SQLExecution, UnsafeExternalRowSorter} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StringType +import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.{SerializableConfiguration, Utils} @@ -51,11 +53,36 @@ object FileFormatWriter extends Logging { customPartitionLocations: Map[TablePartitionSpec, String], outputColumns: Seq[Attribute]) + /** A function that converts the empty string to null for partition values. */ + case class Empty2Null(child: Expression) extends UnaryExpression with String2StringExpression { + override def convert(v: UTF8String): UTF8String = if (v.numBytes() == 0) null else v + override def nullable: Boolean = true + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + nullSafeCodeGen(ctx, ev, c => { + s"""if ($c.numBytes() == 0) { + | ${ev.isNull} = true; + | ${ev.value} = null; + |} else { + | ${ev.value} = $c; + |}""".stripMargin + }) + } + + override protected def withNewChildInternal(newChild: Expression): Empty2Null = + copy(child = newChild) + } + /** Describes how concurrent output writers should be executed. */ case class ConcurrentOutputWriterSpec( maxWriters: Int, createSorter: () => UnsafeExternalRowSorter) + /** + * A variable used in tests to check whether the output ordering of the query matches the + * required ordering of the write command. + */ + var outputOrderingMatched: Boolean = false + /** * Basic work flow of this command is: * 1. Driver side setup, including output committer initialization and data source specific @@ -99,7 +126,7 @@ object FileFormatWriter extends Logging { val projectList: Seq[NamedExpression] = plan.output.map { case p if partitionSet.contains(p) && p.dataType == StringType && p.nullable => needConvert = true - Alias(V1WritesUtils.Empty2Null(p), p.name)() + Alias(Empty2Null(p), p.name)() case attr => attr } val empty2NullPlan = if (needConvert) ProjectExec(projectList, plan) else plan @@ -157,16 +184,18 @@ object FileFormatWriter extends Logging { // prepares the job, any exception thrown from here shouldn't cause abortJob() to be called. committer.setupJob(job) + // When `PLANNED_WRITE_ENABLED` is true, the optimizer rule V1Writes will add logical sort + // operator based on the required ordering of the V1 write command. So the output + // ordering of the physical plan should always match the required ordering. Here + // we set the variable to verify this behavior in tests. + // There are two cases where FileFormatWriter still needs to add physical sort: + // 1) When the planned write config is disabled. + // 2) When the concurrent writers are enabled (in this case the required ordering of a + // V1 write command will be empty). + if (Utils.isTesting) outputOrderingMatched = orderingMatched + try { val (rdd, concurrentOutputWriterSpec) = if (orderingMatched) { - // When planned write is enabled, the optimizer rule V1Writes will add logical sort - // operator based on the required ordering of the V1 write command. So the output - // ordering of the physical plan should always match the required ordering. - // There are two cases where FileFormatWriter still needs to add physical sort: - // 1) When the planned write config is disabled. - // 2) When the concurrent writers are enabled (in this case the required ordering of a - // V1 write command will be empty). - logInfo(s"Output ordering is matched for write job ${description.uuid}.") (empty2NullPlan.execute(), None) } else { // SPARK-21165: the `requiredOrdering` is based on the attributes from analyzed plan, and diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala index c5e57b5250fd2..94e5f3ccaf4d0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala @@ -19,14 +19,12 @@ package org.apache.spark.sql.execution.datasources import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.catalyst.catalog.BucketSpec -import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, AttributeSet, BitwiseAnd, Expression, HiveHash, Literal, Pmod, SortOrder, String2StringExpression, UnaryExpression} -import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} +import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, AttributeSet, BitwiseAnd, HiveHash, Literal, Pmod, SortOrder} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Sort} import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.command.DataWritingCommand import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.unsafe.types.UTF8String trait V1WriteCommand extends DataWritingCommand { // Specify the required ordering for the V1 write command. `FileFormatWriter` will @@ -72,25 +70,6 @@ object V1Writes extends Rule[LogicalPlan] with SQLConfHelper { object V1WritesUtils { - /** A function that converts the empty string to null for partition values. */ - case class Empty2Null(child: Expression) extends UnaryExpression with String2StringExpression { - override def convert(v: UTF8String): UTF8String = if (v.numBytes() == 0) null else v - override def nullable: Boolean = true - override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { - nullSafeCodeGen(ctx, ev, c => { - s"""if ($c.numBytes() == 0) { - | ${ev.isNull} = true; - | ${ev.value} = null; - |} else { - | ${ev.value} = $c; - |}""".stripMargin - }) - } - - override protected def withNewChildInternal(newChild: Expression): Empty2Null = - copy(child = newChild) - } - def getWriterBucketSpec( bucketSpec: Option[BucketSpec], dataColumns: Seq[Attribute], @@ -149,18 +128,10 @@ object V1WritesUtils { Seq.empty } else { // We should first sort by partition columns, then bucket id, and finally sorting columns. - val requiredOrdering = - partitionColumns ++writerBucketSpec.map(_.bucketIdExpression) ++ sortColumns - - // Convert empty string partition columns to null when sorting the columns. - // TODO: this will cause output ordering mismatch in FileFormatWriter. - // requiredOrdering.map { - // case a: Attribute if partitionSet.contains(a) && a.dataType == StringType && a.nullable => - // Empty2Null(a) - // case o => o - // }.map(SortOrder(_, Ascending)) - - requiredOrdering.map(SortOrder(_, Ascending)) + // Note we do not need to convert empty string partition columns to null when sorting the + // columns since null and empty string values will be next to each other. + (partitionColumns ++writerBucketSpec.map(_.bucketIdExpression) ++ sortColumns) + .map(SortOrder(_, Ascending)) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala index 9c78c8ea448c3..dddad79732bc0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.execution.datasources -import org.apache.logging.log4j.Level - import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Sort} import org.apache.spark.sql.execution.QueryExecution @@ -70,27 +68,11 @@ abstract class V1WriteCommandSuiteBase extends QueryTest with SQLTestUtils { } spark.listenerManager.register(listener) - val logAppender = new LogAppender("v1 write") - - withLogAppender(logAppender) { - query - } + query - // Check if the ordering is matched before FileFormatWriter execute the plan. - // Note, if we add empty2null in logical sort order, the output ordering will - // not match for string columns. - // For example: - // Project [i#19, j#20, empty2null(k#21) AS k#26] - // +- *(1) Sort [empty2null(k#21) ASC NULLS FIRST], false, 0 - // +- *(1) ColumnarToRow - // +- FileScan parquet spark_catalog.default.t0[i#19,j#20,k#21] - // Here the required ordering is `k#21` but the actual ordering is `k#26` due to the - // AliasAwareOutputOrdering trait of ProjectExec. - // One solution is to use the command query's output ordering instead of empty2null. - assert(logAppender.loggingEvents.exists { event => - event.getLevel.equals(Level.INFO) && - event.getMessage.getFormattedMessage.contains("Output ordering is matched") - } === orderingMatched, "FileFormatWriter output ordering does not match") + // Check whether the output ordering is matched before FileFormatWriter executes rdd. + assert(FileFormatWriter.outputOrderingMatched == orderingMatched, + s"Expect: $orderingMatched, Actual: ${FileFormatWriter.outputOrderingMatched}") sparkContext.listenerBus.waitUntilEmpty() diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/V1WriteHiveCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/V1WriteHiveCommandSuite.scala index 4cc77067bd0c0..7181f8866cb02 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/V1WriteHiveCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/V1WriteHiveCommandSuite.scala @@ -22,6 +22,21 @@ import org.apache.spark.sql.hive.test.TestHiveSingleton class V1WriteHiveCommandSuite extends V1WriteCommandSuiteBase with TestHiveSingleton { + test("create hive table as select - no partition column") { + withPlannedWrite { enabled => + withTable("t") { + executeAndCheckOrdering(hasLogicalSort = false, orderingMatched = true) { + sql( + """ + |CREATE TABLE t + |STORED AS PARQUET + |AS SELECT * FROM t0 + |""".stripMargin) + } + } + } + } + test("create hive table as select") { withPlannedWrite { enabled => withTable("t") { From f5e05ae44dbe17241c4f0d3307962056f8224dfd Mon Sep 17 00:00:00 2001 From: allisonwang-db Date: Mon, 18 Jul 2022 15:09:00 -0700 Subject: [PATCH 5/6] apply AQE on top of DataWritingCommandExec --- .../sql/execution/adaptive/AdaptiveSparkPlanExec.scala | 9 +-------- .../sql/execution/adaptive/InsertAdaptiveSparkPlan.scala | 3 +-- 2 files changed, 2 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index 6c34c3f999d55..44a5ba4a547d9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -30,7 +30,7 @@ import org.apache.spark.broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder} +import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer} import org.apache.spark.sql.catalyst.plans.physical.{Distribution, UnspecifiedDistribution} import org.apache.spark.sql.catalyst.rules.{PlanChangeLogger, Rule} @@ -215,13 +215,6 @@ case class AdaptiveSparkPlanExec( executedPlan.resetMetrics() } - override def outputOrdering: Seq[SortOrder] = { - // AQE will not change the output ordering if it is defined in the logical plan. - // Use inputPlan's logicalLink here in case some top level physical nodes may be - // removed in `initialPlan`. - inputPlan.logicalLink.map(_.outputOrdering).getOrElse(Nil) - } - private def getExecutionId: Option[Long] = { // If the `QueryExecution` does not match the current execution ID, it means the execution ID // belongs to another (parent) query, and we should not call update UI in this query. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala index 4410f7fea81af..1c34e61c593ef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.plans.physical.UnspecifiedDistribution import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.TreePattern._ import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.command.{DataWritingCommandExec, ExecutedCommandExec} +import org.apache.spark.sql.execution.command.ExecutedCommandExec import org.apache.spark.sql.execution.datasources.v2.V2CommandExec import org.apache.spark.sql.execution.exchange.Exchange import org.apache.spark.sql.internal.SQLConf @@ -46,7 +46,6 @@ case class InsertAdaptiveSparkPlan( case _ if !conf.adaptiveExecutionEnabled => plan case _: ExecutedCommandExec => plan case _: CommandResultExec => plan - case c: DataWritingCommandExec => c.copy(child = apply(c.child)) case c: V2CommandExec => c.withNewChildren(c.children.map(apply)) case _ if shouldApplyAQE(plan, isSubquery) => if (supportAdaptive(plan)) { From aa00cdabc4d49282047c15a66b6e5b6e92179e7b Mon Sep 17 00:00:00 2001 From: allisonwang-db Date: Mon, 18 Jul 2022 21:50:40 -0700 Subject: [PATCH 6/6] fix AQE tests and add more Hive tests --- .../adaptive/InsertAdaptiveSparkPlan.scala | 5 ++- .../datasources/FileFormatWriter.scala | 2 +- .../adaptive/AdaptiveQueryExecSuite.scala | 37 ++++++++++----- .../datasources/V1WriteCommandSuite.scala | 9 +++- .../command/V1WriteHiveCommandSuite.scala | 45 +++++++++++++++---- 5 files changed, 73 insertions(+), 25 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala index 1c34e61c593ef..94cef481ecfc6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala @@ -26,7 +26,8 @@ import org.apache.spark.sql.catalyst.plans.physical.UnspecifiedDistribution import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.TreePattern._ import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.command.ExecutedCommandExec +import org.apache.spark.sql.execution.command.{DataWritingCommandExec, ExecutedCommandExec} +import org.apache.spark.sql.execution.datasources.V1WriteCommand import org.apache.spark.sql.execution.datasources.v2.V2CommandExec import org.apache.spark.sql.execution.exchange.Exchange import org.apache.spark.sql.internal.SQLConf @@ -47,6 +48,8 @@ case class InsertAdaptiveSparkPlan( case _: ExecutedCommandExec => plan case _: CommandResultExec => plan case c: V2CommandExec => c.withNewChildren(c.children.map(apply)) + case c: DataWritingCommandExec if !c.cmd.isInstanceOf[V1WriteCommand] => + c.copy(child = apply(c.child)) case _ if shouldApplyAQE(plan, isSubquery) => if (supportAdaptive(plan)) { try { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 7b79095373c85..6b48420920d96 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -81,7 +81,7 @@ object FileFormatWriter extends Logging { * A variable used in tests to check whether the output ordering of the query matches the * required ordering of the write command. */ - var outputOrderingMatched: Boolean = false + private[sql] var outputOrderingMatched: Boolean = false /** * Basic work flow of this command is: diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 77d7261fb65f0..5d8e4bbecfecc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -29,7 +29,7 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListe import org.apache.spark.sql.{Dataset, QueryTest, Row, SparkSession, Strategy} import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan} -import org.apache.spark.sql.execution.{CollectLimitExec, CommandResultExec, LocalTableScanExec, PartialReducerPartitionSpec, QueryExecution, ReusedSubqueryExec, ShuffledRowRDD, SortExec, SparkPlan, UnaryExecNode, UnionExec} +import org.apache.spark.sql.execution.{CollectLimitExec, LocalTableScanExec, PartialReducerPartitionSpec, QueryExecution, ReusedSubqueryExec, ShuffledRowRDD, SortExec, SparkPlan, UnionExec} import org.apache.spark.sql.execution.aggregate.BaseAggregateExec import org.apache.spark.sql.execution.command.DataWritingCommandExec import org.apache.spark.sql.execution.datasources.noop.NoopDataSource @@ -1119,16 +1119,27 @@ class AdaptiveQueryExecSuite } } - test("SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of write commands") { + test("SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of v2 write commands") { withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") { + var plan: SparkPlan = null + val listener = new QueryExecutionListener { + override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { + plan = qe.executedPlan + } + override def onFailure( + funcName: String, qe: QueryExecution, exception: Exception): Unit = {} + } + spark.listenerManager.register(listener) withTable("t1") { - val plan = sql("CREATE TABLE t1 USING parquet AS SELECT 1 col").queryExecution.executedPlan - assert(plan.isInstanceOf[CommandResultExec]) - val commandResultExec = plan.asInstanceOf[CommandResultExec] - assert(commandResultExec.commandPhysicalPlan.isInstanceOf[DataWritingCommandExec]) - assert(commandResultExec.commandPhysicalPlan.asInstanceOf[DataWritingCommandExec] - .child.isInstanceOf[AdaptiveSparkPlanExec]) + val format = classOf[NoopDataSource].getName + Seq((0, 1)).toDF("x", "y").write.format(format).mode("overwrite").save() + + sparkContext.listenerBus.waitUntilEmpty() + assert(plan.isInstanceOf[V2TableWriteExec]) + assert(plan.asInstanceOf[V2TableWriteExec].child.isInstanceOf[AdaptiveSparkPlanExec]) + + spark.listenerManager.unregister(listener) } } } @@ -1179,7 +1190,10 @@ class AdaptiveQueryExecSuite override def onOtherEvent(event: SparkListenerEvent): Unit = { event match { case SparkListenerSQLAdaptiveExecutionUpdate(_, _, planInfo) => - assert(planInfo.nodeName == "Execute CreateDataSourceTableAsSelectCommand") + assert(planInfo.nodeName == "AdaptiveSparkPlan") + assert(planInfo.children.size == 1) + assert(planInfo.children.head.nodeName == + "Execute CreateDataSourceTableAsSelectCommand") checkDone = true case _ => // ignore other events } @@ -1584,9 +1598,8 @@ class AdaptiveQueryExecSuite var noLocalread: Boolean = false val listener = new QueryExecutionListener { override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { - qe.executedPlan match { - case plan@(_: DataWritingCommandExec | _: V2TableWriteExec) => - assert(plan.asInstanceOf[UnaryExecNode].child.isInstanceOf[AdaptiveSparkPlanExec]) + stripAQEPlan(qe.executedPlan) match { + case plan @ (_: DataWritingCommandExec | _: V2TableWriteExec) => noLocalread = collect(plan) { case exec: AQEShuffleReadExec if exec.isLocalRead => exec }.isEmpty diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala index dddad79732bc0..350cac4913971 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala @@ -78,10 +78,11 @@ abstract class V1WriteCommandSuiteBase extends QueryTest with SQLTestUtils { // Check whether a logical sort node is at the top of the logical plan of the write query. if (optimizedPlan != null) { - assert(optimizedPlan.isInstanceOf[Sort] === hasLogicalSort) + assert(optimizedPlan.isInstanceOf[Sort] == hasLogicalSort, + s"Expect hasLogicalSort: $hasLogicalSort, Actual: ${optimizedPlan.isInstanceOf[Sort]}") } - spark.listenerManager.register(listener) + spark.listenerManager.unregister(listener) } } @@ -177,6 +178,10 @@ class V1WriteCommandSuite extends V1WriteCommandSuiteBase with SharedSparkSessio checkAnswer( spark.read.parquet(path.toString).where("p IS NULL").sort($"id"), Seq(Row(0, null), Row(1, null), Row(2, null))) + // Check the empty string and null values should be written to the same file. + val files = path.listFiles().filterNot( + f => f.getName.startsWith(".") || f.getName.startsWith("_")) + assert(files.length == 2) } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/V1WriteHiveCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/V1WriteHiveCommandSuite.scala index 7181f8866cb02..2c8b200150194 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/V1WriteHiveCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/V1WriteHiveCommandSuite.scala @@ -26,12 +26,7 @@ class V1WriteHiveCommandSuite extends V1WriteCommandSuiteBase with TestHiveSingl withPlannedWrite { enabled => withTable("t") { executeAndCheckOrdering(hasLogicalSort = false, orderingMatched = true) { - sql( - """ - |CREATE TABLE t - |STORED AS PARQUET - |AS SELECT * FROM t0 - |""".stripMargin) + sql("CREATE TABLE t AS SELECT * FROM t0") } } } @@ -45,7 +40,6 @@ class V1WriteHiveCommandSuite extends V1WriteCommandSuiteBase with TestHiveSingl sql( """ |CREATE TABLE t - |STORED AS PARQUET |PARTITIONED BY (k) |AS SELECT * FROM t0 |""".stripMargin) @@ -61,16 +55,49 @@ class V1WriteHiveCommandSuite extends V1WriteCommandSuiteBase with TestHiveSingl sql( """ |CREATE TABLE t (i INT, j INT) - |STORED AS PARQUET |PARTITIONED BY (k STRING) |CLUSTERED BY (i, j) SORTED BY (j) INTO 2 BUCKETS |""".stripMargin) withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") { executeAndCheckOrdering(hasLogicalSort = enabled, orderingMatched = enabled) { - sql("INSERT OVERWRITE t SELECT * FROM t0") + sql("INSERT INTO t SELECT * FROM t0") } } } } } + + test("insert overwrite hive table") { + withPlannedWrite { enabled => + withTable("t") { + withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") { + sql( + """ + |CREATE TABLE t + |PARTITIONED BY (k) + |AS SELECT * FROM t0 + |""".stripMargin) + executeAndCheckOrdering(hasLogicalSort = enabled, orderingMatched = enabled) { + sql("INSERT OVERWRITE t SELECT j AS i, i AS j, k FROM t0") + } + } + } + } + } + + test("insert into hive table with static partitions only") { + withPlannedWrite { enabled => + withTable("t") { + sql( + """ + |CREATE TABLE t (i INT, j INT) + |PARTITIONED BY (k STRING) + |""".stripMargin) + // No dynamic partition so no sort is needed. + executeAndCheckOrdering(hasLogicalSort = false, orderingMatched = true) { + sql("INSERT INTO t PARTITION (k='0') SELECT i, j FROM t0 WHERE k = '0'") + } + } + } + } }