diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 1b7857ead59fa..631c89d798fe1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -412,6 +412,14 @@ object SQLConf { .longConf .createWithDefault(67108864L) + val PLANNED_WRITE_ENABLED = buildConf("spark.sql.optimizer.plannedWrite.enabled") + .internal() + .doc("When set to true, Spark optimizer will add logical sort operators to V1 write commands " + + "if needed so that `FileFormatWriter` does not need to insert physical sorts.") + .version("3.4.0") + .booleanConf + .createWithDefault(true) + val COMPRESS_CACHED = buildConf("spark.sql.inMemoryColumnarStorage.compressed") .doc("When set to true Spark SQL will automatically select a compression codec for each " + "column based on statistics of the data.") @@ -4617,6 +4625,8 @@ class SQLConf extends Serializable with Logging { def maxConcurrentOutputFileWriters: Int = getConf(SQLConf.MAX_CONCURRENT_OUTPUT_FILE_WRITERS) + def plannedWriteEnabled: Boolean = getConf(SQLConf.PLANNED_WRITE_ENABLED) + def inferDictAsStruct: Boolean = getConf(SQLConf.INFER_NESTED_DICT_AS_STRUCT) def legacyInferArrayTypeFromFirstElement: Boolean = getConf( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala index bffa1d1dae790..72bdab409a9e6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala @@ -23,8 +23,7 @@ import org.apache.spark.sql.catalyst.optimizer._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.connector.catalog.CatalogManager -import org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions -import org.apache.spark.sql.execution.datasources.SchemaPruning +import org.apache.spark.sql.execution.datasources.{PruneFileSourcePartitions, SchemaPruning, V1Writes} import org.apache.spark.sql.execution.datasources.v2.{GroupBasedRowLevelOperationScanPlanning, OptimizeMetadataOnlyDeleteFromTable, V2ScanPartitioningAndOrdering, V2ScanRelationPushDown, V2Writes} import org.apache.spark.sql.execution.dynamicpruning.{CleanupDynamicPruningFilters, PartitionPruning} import org.apache.spark.sql.execution.python.{ExtractGroupingPythonUDFFromAggregate, ExtractPythonUDFFromAggregate, ExtractPythonUDFs} @@ -39,6 +38,7 @@ class SparkOptimizer( // TODO: move SchemaPruning into catalyst Seq(SchemaPruning) :+ GroupBasedRowLevelOperationScanPlanning :+ + V1Writes :+ V2ScanRelationPushDown :+ V2ScanPartitioningAndOrdering :+ V2Writes :+ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala index 4410f7fea81af..94cef481ecfc6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.TreePattern._ import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.command.{DataWritingCommandExec, ExecutedCommandExec} +import org.apache.spark.sql.execution.datasources.V1WriteCommand import org.apache.spark.sql.execution.datasources.v2.V2CommandExec import org.apache.spark.sql.execution.exchange.Exchange import org.apache.spark.sql.internal.SQLConf @@ -46,8 +47,9 @@ case class InsertAdaptiveSparkPlan( case _ if !conf.adaptiveExecutionEnabled => plan case _: ExecutedCommandExec => plan case _: CommandResultExec => plan - case c: DataWritingCommandExec => c.copy(child = apply(c.child)) case c: V2CommandExec => c.withNewChildren(c.children.map(apply)) + case c: DataWritingCommandExec if !c.cmd.isInstanceOf[V1WriteCommand] => + c.copy(child = apply(c.child)) case _ if shouldApplyAQE(plan, isSubquery) => if (supportAdaptive(plan)) { try { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index e64426f8de8f3..50ad55d563338 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -20,7 +20,9 @@ package org.apache.spark.sql.execution.command import java.net.URI import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.catalog._ +import org.apache.spark.sql.catalyst.expressions.SortOrder import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.errors.QueryCompilationErrors @@ -141,7 +143,18 @@ case class CreateDataSourceTableAsSelectCommand( mode: SaveMode, query: LogicalPlan, outputColumnNames: Seq[String]) - extends DataWritingCommand { + extends V1WriteCommand { + + override def requiredOrdering: Seq[SortOrder] = { + val unresolvedPartitionColumns = table.partitionColumnNames.map(UnresolvedAttribute.quoted) + val partitionColumns = DataSource.resolvePartitionColumns( + unresolvedPartitionColumns, + outputColumns, + query, + SparkSession.active.sessionState.conf.resolver) + val options = table.storage.properties + V1WritesUtils.getSortOrder(outputColumns, partitionColumns, table.bucketSpec, options) + } override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = { assert(table.tableType != CatalogTableType.VIEW) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index a220fd334a5ce..8f8846b89f3f3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -29,8 +29,9 @@ import org.apache.spark.SparkException import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute +import org.apache.spark.sql.catalyst.analysis.{Resolver, UnresolvedAttribute} import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogUtils} +import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, TypeUtils} import org.apache.spark.sql.connector.catalog.TableProvider @@ -519,18 +520,8 @@ case class DataSource( case format: FileFormat => disallowWritingIntervals(outputColumns.map(_.dataType), forbidAnsiIntervals = false) val cmd = planForWritingFileFormat(format, mode, data) - val resolvedPartCols = cmd.partitionColumns.map { col => - // The partition columns created in `planForWritingFileFormat` should always be - // `UnresolvedAttribute` with a single name part. - assert(col.isInstanceOf[UnresolvedAttribute]) - val unresolved = col.asInstanceOf[UnresolvedAttribute] - assert(unresolved.nameParts.length == 1) - val name = unresolved.nameParts.head - outputColumns.find(a => equality(a.name, name)).getOrElse { - throw QueryCompilationErrors.cannotResolveAttributeError( - name, data.output.map(_.name).mkString(", ")) - } - } + val resolvedPartCols = + DataSource.resolvePartitionColumns(cmd.partitionColumns, outputColumns, data, equality) val resolved = cmd.copy( partitionColumns = resolvedPartCols, outputColumnNames = outputColumnNames) @@ -836,4 +827,26 @@ object DataSource extends Logging { throw QueryCompilationErrors.writeEmptySchemasUnsupportedByDataSourceError() } } + + /** + * Resolve partition columns using output columns of the query plan. + */ + def resolvePartitionColumns( + partitionColumns: Seq[Attribute], + outputColumns: Seq[Attribute], + plan: LogicalPlan, + resolver: Resolver): Seq[Attribute] = { + partitionColumns.map { col => + // The partition columns created in `planForWritingFileFormat` should always be + // `UnresolvedAttribute` with a single name part. + assert(col.isInstanceOf[UnresolvedAttribute]) + val unresolved = col.asInstanceOf[UnresolvedAttribute] + assert(unresolved.nameParts.length == 1) + val name = unresolved.nameParts.head + outputColumns.find(a => resolver(a.name, name)).getOrElse { + throw QueryCompilationErrors.cannotResolveAttributeError( + name, plan.output.map(_.name).mkString(", ")) + } + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index e75041b7fb0b2..6b48420920d96 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -36,7 +36,6 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReferences import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} -import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.{ProjectExec, SortExec, SparkPlan, SQLExecution, UnsafeExternalRowSorter} @@ -78,6 +77,12 @@ object FileFormatWriter extends Logging { maxWriters: Int, createSorter: () => UnsafeExternalRowSorter) + /** + * A variable used in tests to check whether the output ordering of the query matches the + * required ordering of the write command. + */ + private[sql] var outputOrderingMatched: Boolean = false + /** * Basic work flow of this command is: * 1. Driver side setup, including output committer initialization and data source specific @@ -126,38 +131,8 @@ object FileFormatWriter extends Logging { } val empty2NullPlan = if (needConvert) ProjectExec(projectList, plan) else plan - val writerBucketSpec = bucketSpec.map { spec => - val bucketColumns = spec.bucketColumnNames.map(c => dataColumns.find(_.name == c).get) - - if (options.getOrElse(BucketingUtils.optionForHiveCompatibleBucketWrite, "false") == - "true") { - // Hive bucketed table: use `HiveHash` and bitwise-and as bucket id expression. - // Without the extra bitwise-and operation, we can get wrong bucket id when hash value of - // columns is negative. See Hive implementation in - // `org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils#getBucketNumber()`. - val hashId = BitwiseAnd(HiveHash(bucketColumns), Literal(Int.MaxValue)) - val bucketIdExpression = Pmod(hashId, Literal(spec.numBuckets)) - - // The bucket file name prefix is following Hive, Presto and Trino conversion, so this - // makes sure Hive bucketed table written by Spark, can be read by other SQL engines. - // - // Hive: `org.apache.hadoop.hive.ql.exec.Utilities#getBucketIdFromFile()`. - // Trino: `io.trino.plugin.hive.BackgroundHiveSplitLoader#BUCKET_PATTERNS`. - val fileNamePrefix = (bucketId: Int) => f"$bucketId%05d_0_" - WriterBucketSpec(bucketIdExpression, fileNamePrefix) - } else { - // Spark bucketed table: use `HashPartitioning.partitionIdExpression` as bucket id - // expression, so that we can guarantee the data distribution is same between shuffle and - // bucketed data source, which enables us to only shuffle one side when join a bucketed - // table and a normal one. - val bucketIdExpression = HashPartitioning(bucketColumns, spec.numBuckets) - .partitionIdExpression - WriterBucketSpec(bucketIdExpression, (_: Int) => "") - } - } - val sortColumns = bucketSpec.toSeq.flatMap { - spec => spec.sortColumnNames.map(c => dataColumns.find(_.name == c).get) - } + val writerBucketSpec = V1WritesUtils.getWriterBucketSpec(bucketSpec, dataColumns, options) + val sortColumns = V1WritesUtils.getBucketSortColumns(bucketSpec, dataColumns) val caseInsensitiveOptions = CaseInsensitiveMap(options) @@ -209,6 +184,16 @@ object FileFormatWriter extends Logging { // prepares the job, any exception thrown from here shouldn't cause abortJob() to be called. committer.setupJob(job) + // When `PLANNED_WRITE_ENABLED` is true, the optimizer rule V1Writes will add logical sort + // operator based on the required ordering of the V1 write command. So the output + // ordering of the physical plan should always match the required ordering. Here + // we set the variable to verify this behavior in tests. + // There are two cases where FileFormatWriter still needs to add physical sort: + // 1) When the planned write config is disabled. + // 2) When the concurrent writers are enabled (in this case the required ordering of a + // V1 write command will be empty). + if (Utils.isTesting) outputOrderingMatched = orderingMatched + try { val (rdd, concurrentOutputWriterSpec) = if (orderingMatched) { (empty2NullPlan.execute(), None) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index d773d4bd271b3..e20d9ed8b537a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTablePartition} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils._ -import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} @@ -57,7 +57,7 @@ case class InsertIntoHadoopFsRelationCommand( catalogTable: Option[CatalogTable], fileIndex: Option[FileIndex], outputColumnNames: Seq[String]) - extends DataWritingCommand { + extends V1WriteCommand { private lazy val parameters = CaseInsensitiveMap(options) @@ -74,6 +74,9 @@ case class InsertIntoHadoopFsRelationCommand( staticPartitions.size < partitionColumns.length } + override def requiredOrdering: Seq[SortOrder] = + V1WritesUtils.getSortOrder(outputColumns, partitionColumns, bucketSpec, options) + override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = { // Most formats don't do well with duplicate columns, so lets not allow that SchemaUtils.checkColumnNameDuplication( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala new file mode 100644 index 0000000000000..94e5f3ccaf4d0 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import org.apache.spark.sql.catalyst.SQLConfHelper +import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, AttributeSet, BitwiseAnd, HiveHash, Literal, Pmod, SortOrder} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Sort} +import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.command.DataWritingCommand +import org.apache.spark.sql.internal.SQLConf + +trait V1WriteCommand extends DataWritingCommand { + // Specify the required ordering for the V1 write command. `FileFormatWriter` will + // add SortExec if necessary when the requiredOrdering is empty. + def requiredOrdering: Seq[SortOrder] +} + +/** + * A rule that adds logical sorts to V1 data writing commands. + */ +object V1Writes extends Rule[LogicalPlan] with SQLConfHelper { + override def apply(plan: LogicalPlan): LogicalPlan = { + if (conf.plannedWriteEnabled) { + plan.transformDown { + case write: V1WriteCommand => + val newQuery = prepareQuery(write, write.query) + write.withNewChildren(newQuery :: Nil) + } + } else { + plan + } + } + + private def prepareQuery(write: V1WriteCommand, query: LogicalPlan): LogicalPlan = { + val requiredOrdering = write.requiredOrdering + val outputOrdering = query.outputOrdering + // Check if the ordering is already matched. It is needed to ensure the + // idempotency of the rule. + val orderingMatched = if (requiredOrdering.length > outputOrdering.length) { + false + } else { + requiredOrdering.zip(outputOrdering).forall { + case (requiredOrder, outputOrder) => requiredOrder.semanticEquals(outputOrder) + } + } + if (orderingMatched) { + query + } else { + Sort(requiredOrdering, global = false, query) + } + } +} + +object V1WritesUtils { + + def getWriterBucketSpec( + bucketSpec: Option[BucketSpec], + dataColumns: Seq[Attribute], + options: Map[String, String]): Option[WriterBucketSpec] = { + bucketSpec.map { spec => + val bucketColumns = spec.bucketColumnNames.map(c => dataColumns.find(_.name == c).get) + + if (options.getOrElse(BucketingUtils.optionForHiveCompatibleBucketWrite, "false") == + "true") { + // Hive bucketed table: use `HiveHash` and bitwise-and as bucket id expression. + // Without the extra bitwise-and operation, we can get wrong bucket id when hash value of + // columns is negative. See Hive implementation in + // `org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils#getBucketNumber()`. + val hashId = BitwiseAnd(HiveHash(bucketColumns), Literal(Int.MaxValue)) + val bucketIdExpression = Pmod(hashId, Literal(spec.numBuckets)) + + // The bucket file name prefix is following Hive, Presto and Trino conversion, so this + // makes sure Hive bucketed table written by Spark, can be read by other SQL engines. + // + // Hive: `org.apache.hadoop.hive.ql.exec.Utilities#getBucketIdFromFile()`. + // Trino: `io.trino.plugin.hive.BackgroundHiveSplitLoader#BUCKET_PATTERNS`. + val fileNamePrefix = (bucketId: Int) => f"$bucketId%05d_0_" + WriterBucketSpec(bucketIdExpression, fileNamePrefix) + } else { + // Spark bucketed table: use `HashPartitioning.partitionIdExpression` as bucket id + // expression, so that we can guarantee the data distribution is same between shuffle and + // bucketed data source, which enables us to only shuffle one side when join a bucketed + // table and a normal one. + val bucketIdExpression = HashPartitioning(bucketColumns, spec.numBuckets) + .partitionIdExpression + WriterBucketSpec(bucketIdExpression, (_: Int) => "") + } + } + } + + def getBucketSortColumns( + bucketSpec: Option[BucketSpec], + dataColumns: Seq[Attribute]): Seq[Attribute] = { + bucketSpec.toSeq.flatMap { + spec => spec.sortColumnNames.map(c => dataColumns.find(_.name == c).get) + } + } + + def getSortOrder( + outputColumns: Seq[Attribute], + partitionColumns: Seq[Attribute], + bucketSpec: Option[BucketSpec], + options: Map[String, String]): Seq[SortOrder] = { + val partitionSet = AttributeSet(partitionColumns) + val dataColumns = outputColumns.filterNot(partitionSet.contains) + val writerBucketSpec = V1WritesUtils.getWriterBucketSpec(bucketSpec, dataColumns, options) + val sortColumns = V1WritesUtils.getBucketSortColumns(bucketSpec, dataColumns) + + if (SQLConf.get.maxConcurrentOutputFileWriters > 0 && sortColumns.isEmpty) { + // Do not insert logical sort when concurrent output writers are enabled. + Seq.empty + } else { + // We should first sort by partition columns, then bucket id, and finally sorting columns. + // Note we do not need to convert empty string partition columns to null when sorting the + // columns since null and empty string values will be next to each other. + (partitionColumns ++writerBucketSpec.map(_.bucketIdExpression) ++ sortColumns) + .map(SortOrder(_, Ascending)) + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 77d7261fb65f0..5d8e4bbecfecc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -29,7 +29,7 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListe import org.apache.spark.sql.{Dataset, QueryTest, Row, SparkSession, Strategy} import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan} -import org.apache.spark.sql.execution.{CollectLimitExec, CommandResultExec, LocalTableScanExec, PartialReducerPartitionSpec, QueryExecution, ReusedSubqueryExec, ShuffledRowRDD, SortExec, SparkPlan, UnaryExecNode, UnionExec} +import org.apache.spark.sql.execution.{CollectLimitExec, LocalTableScanExec, PartialReducerPartitionSpec, QueryExecution, ReusedSubqueryExec, ShuffledRowRDD, SortExec, SparkPlan, UnionExec} import org.apache.spark.sql.execution.aggregate.BaseAggregateExec import org.apache.spark.sql.execution.command.DataWritingCommandExec import org.apache.spark.sql.execution.datasources.noop.NoopDataSource @@ -1119,16 +1119,27 @@ class AdaptiveQueryExecSuite } } - test("SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of write commands") { + test("SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of v2 write commands") { withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") { + var plan: SparkPlan = null + val listener = new QueryExecutionListener { + override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { + plan = qe.executedPlan + } + override def onFailure( + funcName: String, qe: QueryExecution, exception: Exception): Unit = {} + } + spark.listenerManager.register(listener) withTable("t1") { - val plan = sql("CREATE TABLE t1 USING parquet AS SELECT 1 col").queryExecution.executedPlan - assert(plan.isInstanceOf[CommandResultExec]) - val commandResultExec = plan.asInstanceOf[CommandResultExec] - assert(commandResultExec.commandPhysicalPlan.isInstanceOf[DataWritingCommandExec]) - assert(commandResultExec.commandPhysicalPlan.asInstanceOf[DataWritingCommandExec] - .child.isInstanceOf[AdaptiveSparkPlanExec]) + val format = classOf[NoopDataSource].getName + Seq((0, 1)).toDF("x", "y").write.format(format).mode("overwrite").save() + + sparkContext.listenerBus.waitUntilEmpty() + assert(plan.isInstanceOf[V2TableWriteExec]) + assert(plan.asInstanceOf[V2TableWriteExec].child.isInstanceOf[AdaptiveSparkPlanExec]) + + spark.listenerManager.unregister(listener) } } } @@ -1179,7 +1190,10 @@ class AdaptiveQueryExecSuite override def onOtherEvent(event: SparkListenerEvent): Unit = { event match { case SparkListenerSQLAdaptiveExecutionUpdate(_, _, planInfo) => - assert(planInfo.nodeName == "Execute CreateDataSourceTableAsSelectCommand") + assert(planInfo.nodeName == "AdaptiveSparkPlan") + assert(planInfo.children.size == 1) + assert(planInfo.children.head.nodeName == + "Execute CreateDataSourceTableAsSelectCommand") checkDone = true case _ => // ignore other events } @@ -1584,9 +1598,8 @@ class AdaptiveQueryExecSuite var noLocalread: Boolean = false val listener = new QueryExecutionListener { override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { - qe.executedPlan match { - case plan@(_: DataWritingCommandExec | _: V2TableWriteExec) => - assert(plan.asInstanceOf[UnaryExecNode].child.isInstanceOf[AdaptiveSparkPlanExec]) + stripAQEPlan(qe.executedPlan) match { + case plan @ (_: DataWritingCommandExec | _: V2TableWriteExec) => noLocalread = collect(plan) { case exec: AQEShuffleReadExec if exec.isLocalRead => exec }.isEmpty diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala new file mode 100644 index 0000000000000..350cac4913971 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Sort} +import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils} +import org.apache.spark.sql.util.QueryExecutionListener + +abstract class V1WriteCommandSuiteBase extends QueryTest with SQLTestUtils { + + import testImplicits._ + + setupTestData() + + protected override def beforeAll(): Unit = { + super.beforeAll() + (0 to 20).map(i => (i, i % 5, (i % 10).toString)) + .toDF("i", "j", "k") + .write + .saveAsTable("t0") + } + + protected override def afterAll(): Unit = { + sql("drop table if exists t0") + super.afterAll() + } + + protected def withPlannedWrite(testFunc: Boolean => Any): Unit = { + Seq(true, false).foreach { enabled => + withSQLConf(SQLConf.PLANNED_WRITE_ENABLED.key -> enabled.toString) { + testFunc(enabled) + } + } + } + + // Execute a write query and check ordering of the plan. + protected def executeAndCheckOrdering( + hasLogicalSort: Boolean, orderingMatched: Boolean)(query: => Unit): Unit = { + var optimizedPlan: LogicalPlan = null + + val listener = new QueryExecutionListener { + override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { + qe.optimizedPlan match { + case w: V1WriteCommand => + optimizedPlan = w.query + case _ => + } + } + override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {} + } + spark.listenerManager.register(listener) + + query + + // Check whether the output ordering is matched before FileFormatWriter executes rdd. + assert(FileFormatWriter.outputOrderingMatched == orderingMatched, + s"Expect: $orderingMatched, Actual: ${FileFormatWriter.outputOrderingMatched}") + + sparkContext.listenerBus.waitUntilEmpty() + + // Check whether a logical sort node is at the top of the logical plan of the write query. + if (optimizedPlan != null) { + assert(optimizedPlan.isInstanceOf[Sort] == hasLogicalSort, + s"Expect hasLogicalSort: $hasLogicalSort, Actual: ${optimizedPlan.isInstanceOf[Sort]}") + } + + spark.listenerManager.unregister(listener) + } +} + +class V1WriteCommandSuite extends V1WriteCommandSuiteBase with SharedSparkSession { + + import testImplicits._ + + test("v1 write without partition columns") { + withPlannedWrite { enabled => + withTable("t") { + executeAndCheckOrdering(hasLogicalSort = false, orderingMatched = true) { + sql("CREATE TABLE t USING PARQUET AS SELECT * FROM t0") + } + } + } + } + + test("v1 write with non-string partition columns") { + withPlannedWrite { enabled => + withTable("t") { + executeAndCheckOrdering(hasLogicalSort = enabled, orderingMatched = enabled) { + sql("CREATE TABLE t USING PARQUET PARTITIONED BY (j) AS SELECT i, k, j FROM t0") + } + } + } + } + + test("v1 write with string partition columns") { + withPlannedWrite { enabled => + withTable("t") { + executeAndCheckOrdering(hasLogicalSort = enabled, orderingMatched = enabled) { + sql("CREATE TABLE t USING PARQUET PARTITIONED BY (k) AS SELECT * FROM t0") + } + } + } + } + + test("v1 write with partition, bucketed and sort columns") { + withPlannedWrite { enabled => + withTable("t") { + sql( + """ + |CREATE TABLE t(i INT, j INT) USING PARQUET + |PARTITIONED BY (k STRING) + |CLUSTERED BY (i, j) SORTED BY (j) INTO 2 BUCKETS + |""".stripMargin) + executeAndCheckOrdering(hasLogicalSort = enabled, orderingMatched = enabled) { + sql("INSERT INTO t SELECT * FROM t0") + } + } + } + } + + test("v1 write with already sorted plan - non-string partition column") { + withPlannedWrite { enabled => + withTable("t") { + sql( + """ + |CREATE TABLE t(i INT, k STRING) USING PARQUET + |PARTITIONED BY (j INT) + |""".stripMargin) + executeAndCheckOrdering(hasLogicalSort = true, orderingMatched = true) { + sql("INSERT INTO t SELECT i, k, j FROM t0 ORDER BY j") + } + } + } + } + + test("v1 write with already sorted plan - string partition column") { + withPlannedWrite { enabled => + withTable("t") { + sql( + """ + |CREATE TABLE t(i INT, j INT) USING PARQUET + |PARTITIONED BY (k STRING) + |""".stripMargin) + executeAndCheckOrdering(hasLogicalSort = true, orderingMatched = true) { + sql("INSERT INTO t SELECT * FROM t0 ORDER BY k") + } + } + } + } + + test("v1 write with null and empty string column values") { + withPlannedWrite { enabled => + withTempPath { path => + executeAndCheckOrdering(hasLogicalSort = enabled, orderingMatched = enabled) { + Seq((0, None), (1, Some("")), (2, None), (3, Some("x"))) + .toDF("id", "p") + .write + .partitionBy("p") + .parquet(path.toString) + checkAnswer( + spark.read.parquet(path.toString).where("p IS NULL").sort($"id"), + Seq(Row(0, null), Row(1, null), Row(2, null))) + // Check the empty string and null values should be written to the same file. + val files = path.listFiles().filterNot( + f => f.getName.startsWith(".") || f.getName.startsWith("_")) + assert(files.length == 2) + } + } + } + } + + test("v1 write with AQE changing SMJ to BHJ") { + withPlannedWrite { enabled => + withTable("t") { + sql( + """ + |CREATE TABLE t(key INT, value STRING) USING PARQUET + |PARTITIONED BY (a INT) + |""".stripMargin) + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") { + // The top level sort added by V1 write will be removed by the physical rule + // RemoveRedundantSorts initially, and during the execution AQE will change + // SMJ to BHJ which will remove the original output ordering from the SMJ. + // In this case AQE should still add back the sort node from the logical plan + // during re-planning, and ordering should be matched in FileFormatWriter. + executeAndCheckOrdering(hasLogicalSort = enabled, orderingMatched = enabled) { + sql( + """ + |INSERT INTO t + |SELECT key, value, a + |FROM testData JOIN testData2 ON key = a + |WHERE value = '1' + |""".stripMargin) + } + } + } + } + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala index 96b41dd8e35fa..55644e6a341fd 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala @@ -21,16 +21,17 @@ import scala.util.control.NonFatal import org.apache.spark.sql.{Row, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SessionCatalog} +import org.apache.spark.sql.catalyst.expressions.SortOrder import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command.{DataWritingCommand, DDLUtils} -import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InsertIntoHadoopFsRelationCommand, LogicalRelation} +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InsertIntoHadoopFsRelationCommand, LogicalRelation, V1WriteCommand, V1WritesUtils} import org.apache.spark.sql.hive.HiveSessionCatalog import org.apache.spark.util.Utils -trait CreateHiveTableAsSelectBase extends DataWritingCommand { +trait CreateHiveTableAsSelectBase extends V1WriteCommand with V1WritesHiveUtils { val tableDesc: CatalogTable val query: LogicalPlan val outputColumnNames: Seq[String] @@ -38,6 +39,21 @@ trait CreateHiveTableAsSelectBase extends DataWritingCommand { protected val tableIdentifier = tableDesc.identifier + override def requiredOrdering: Seq[SortOrder] = { + // If the table does not exist the schema should always be empty. + val table = if (tableDesc.schema.isEmpty) { + val tableSchema = CharVarcharUtils.getRawSchema(outputColumns.toStructType, conf) + tableDesc.copy(schema = tableSchema) + } else { + tableDesc + } + // For CTAS, there is no static partition values to insert. + val partition = tableDesc.partitionColumnNames.map(_ -> None).toMap + val partitionColumns = getDynamicPartitionColumns(table, partition, query) + val options = getOptionsWithHiveBucketWrite(tableDesc.bucketSpec) + V1WritesUtils.getSortOrder(outputColumns, partitionColumns, tableDesc.bucketSpec, options) + } + override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val tableExists = catalog.tableExists(tableIdentifier) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 8fca95130dd8f..dcaeac63fb2b3 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -17,23 +17,20 @@ package org.apache.spark.sql.hive.execution -import java.util.Locale - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.conf.HiveConf -import org.apache.hadoop.hive.ql.ErrorMsg import org.apache.hadoop.hive.ql.plan.TableDesc -import org.apache.spark.SparkException -import org.apache.spark.sql.{AnalysisException, Row, SparkSession} +import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.expressions.SortOrder import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap -import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} +import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command.CommandUtils +import org.apache.spark.sql.execution.datasources.{V1WriteCommand, V1WritesUtils} import org.apache.spark.sql.hive.HiveExternalCatalog import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} import org.apache.spark.sql.hive.client.HiveClientImpl @@ -76,7 +73,14 @@ case class InsertIntoHiveTable( query: LogicalPlan, overwrite: Boolean, ifPartitionNotExists: Boolean, - outputColumnNames: Seq[String]) extends SaveAsHiveFile { + outputColumnNames: Seq[String] + ) extends SaveAsHiveFile with V1WriteCommand with V1WritesHiveUtils { + + override def requiredOrdering: Seq[SortOrder] = { + val partitionColumns = getDynamicPartitionColumns(table, partition, query) + val options = getOptionsWithHiveBucketWrite(table.bucketSpec) + V1WritesUtils.getSortOrder(outputColumns, partitionColumns, table.bucketSpec, options) + } /** * Inserts all the rows in the table into Hive. Row objects are properly serialized with the @@ -133,53 +137,8 @@ case class InsertIntoHiveTable( val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false) val numDynamicPartitions = partition.values.count(_.isEmpty) - val numStaticPartitions = partition.values.count(_.nonEmpty) - val partitionSpec = partition.map { - case (key, Some(null)) => key -> ExternalCatalogUtils.DEFAULT_PARTITION_NAME - case (key, Some(value)) => key -> value - case (key, None) => key -> "" - } - - // All partition column names in the format of "//..." - val partitionColumns = fileSinkConf.getTableInfo.getProperties.getProperty("partition_columns") - val partitionColumnNames = Option(partitionColumns).map(_.split("/")).getOrElse(Array.empty) - - // By this time, the partition map must match the table's partition columns - if (partitionColumnNames.toSet != partition.keySet) { - throw QueryExecutionErrors.requestedPartitionsMismatchTablePartitionsError(table, partition) - } - - // Validate partition spec if there exist any dynamic partitions - if (numDynamicPartitions > 0) { - // Report error if dynamic partitioning is not enabled - if (!hadoopConf.get("hive.exec.dynamic.partition", "true").toBoolean) { - throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg) - } - - // Report error if dynamic partition strict mode is on but no static partition is found - if (numStaticPartitions == 0 && - hadoopConf.get("hive.exec.dynamic.partition.mode", "strict").equalsIgnoreCase("strict")) { - throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg) - } - - // Report error if any static partition appears after a dynamic partition - val isDynamic = partitionColumnNames.map(partitionSpec(_).isEmpty) - if (isDynamic.init.zip(isDynamic.tail).contains((true, false))) { - throw new AnalysisException(ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg) - } - } - - val partitionAttributes = partitionColumnNames.takeRight(numDynamicPartitions).map { name => - val attr = query.resolve(name :: Nil, sparkSession.sessionState.analyzer.resolver).getOrElse { - throw QueryCompilationErrors.cannotResolveAttributeError( - name, query.output.map(_.name).mkString(", ")) - }.asInstanceOf[Attribute] - // SPARK-28054: Hive metastore is not case preserving and keeps partition columns - // with lower cased names. Hive will validate the column names in the partition directories - // during `loadDynamicPartitions`. Spark needs to write partition directories with lower-cased - // column names in order to make `loadDynamicPartitions` work. - attr.withName(name.toLowerCase(Locale.ROOT)) - } + val partitionSpec = getPartitionSpec(partition) + val partitionAttributes = getDynamicPartitionColumns(table, partition, query) val writtenParts = saveAsHiveFile( sparkSession = sparkSession, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala index 7f885729bd2be..799cea42e1e8a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala @@ -37,13 +37,13 @@ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command.DataWritingCommand -import org.apache.spark.sql.execution.datasources.{BucketingUtils, FileFormatWriter} +import org.apache.spark.sql.execution.datasources.FileFormatWriter import org.apache.spark.sql.hive.HiveExternalCatalog import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} import org.apache.spark.sql.hive.client.HiveVersion // Base trait from which all hive insert statement physical execution extends. -private[hive] trait SaveAsHiveFile extends DataWritingCommand { +private[hive] trait SaveAsHiveFile extends DataWritingCommand with V1WritesHiveUtils { var createdTempDir: Option[Path] = None @@ -86,9 +86,7 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand { jobId = java.util.UUID.randomUUID().toString, outputPath = outputLocation) - val options = bucketSpec - .map(_ => Map(BucketingUtils.optionForHiveCompatibleBucketWrite -> "true")) - .getOrElse(Map.empty) + val options = getOptionsWithHiveBucketWrite(bucketSpec) FileFormatWriter.write( sparkSession = sparkSession, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/V1WritesHiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/V1WritesHiveUtils.scala new file mode 100644 index 0000000000000..752753f334a23 --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/V1WritesHiveUtils.scala @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import java.util.Locale + +import org.apache.hadoop.hive.ql.ErrorMsg +import org.apache.hadoop.hive.ql.plan.TableDesc + +import org.apache.spark.SparkException +import org.apache.spark.sql.{AnalysisException, SparkSession} +import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, ExternalCatalogUtils} +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} +import org.apache.spark.sql.execution.datasources.BucketingUtils +import org.apache.spark.sql.hive.client.HiveClientImpl + +trait V1WritesHiveUtils { + def getPartitionSpec(partition: Map[String, Option[String]]): Map[String, String] = { + partition.map { + case (key, Some(null)) => key -> ExternalCatalogUtils.DEFAULT_PARTITION_NAME + case (key, Some(value)) => key -> value + case (key, None) => key -> "" + } + } + + def getDynamicPartitionColumns( + table: CatalogTable, + partition: Map[String, Option[String]], + query: LogicalPlan): Seq[Attribute] = { + val numDynamicPartitions = partition.values.count(_.isEmpty) + val numStaticPartitions = partition.values.count(_.nonEmpty) + val partitionSpec = getPartitionSpec(partition) + + val hiveQlTable = HiveClientImpl.toHiveTable(table) + val tableDesc = new TableDesc( + hiveQlTable.getInputFormatClass, + hiveQlTable.getOutputFormatClass, + hiveQlTable.getMetadata + ) + + // All partition column names in the format of "//..." + val partitionColumns = tableDesc.getProperties.getProperty("partition_columns") + val partitionColumnNames = Option(partitionColumns).map(_.split("/")).getOrElse(Array.empty) + + // By this time, the partition map must match the table's partition columns + if (partitionColumnNames.toSet != partition.keySet) { + throw QueryExecutionErrors.requestedPartitionsMismatchTablePartitionsError(table, partition) + } + + val sessionState = SparkSession.active.sessionState + val hadoopConf = sessionState.newHadoopConf() + + // Validate partition spec if there exist any dynamic partitions + if (numDynamicPartitions > 0) { + // Report error if dynamic partitioning is not enabled + if (!hadoopConf.get("hive.exec.dynamic.partition", "true").toBoolean) { + throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg) + } + + // Report error if dynamic partition strict mode is on but no static partition is found + if (numStaticPartitions == 0 && + hadoopConf.get("hive.exec.dynamic.partition.mode", "strict").equalsIgnoreCase("strict")) { + throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg) + } + + // Report error if any static partition appears after a dynamic partition + val isDynamic = partitionColumnNames.map(partitionSpec(_).isEmpty) + if (isDynamic.init.zip(isDynamic.tail).contains((true, false))) { + throw new AnalysisException(ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg) + } + } + + partitionColumnNames.takeRight(numDynamicPartitions).map { name => + val attr = query.resolve(name :: Nil, sessionState.analyzer.resolver).getOrElse { + throw QueryCompilationErrors.cannotResolveAttributeError( + name, query.output.map(_.name).mkString(", ")) + }.asInstanceOf[Attribute] + // SPARK-28054: Hive metastore is not case preserving and keeps partition columns + // with lower cased names. Hive will validate the column names in the partition directories + // during `loadDynamicPartitions`. Spark needs to write partition directories with lower-cased + // column names in order to make `loadDynamicPartitions` work. + attr.withName(name.toLowerCase(Locale.ROOT)) + } + } + + def getOptionsWithHiveBucketWrite(bucketSpec: Option[BucketSpec]): Map[String, String] = { + bucketSpec + .map(_ => Map(BucketingUtils.optionForHiveCompatibleBucketWrite -> "true")) + .getOrElse(Map.empty) + } +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/V1WriteHiveCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/V1WriteHiveCommandSuite.scala new file mode 100644 index 0000000000000..2c8b200150194 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/V1WriteHiveCommandSuite.scala @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution.command + +import org.apache.spark.sql.execution.datasources.V1WriteCommandSuiteBase +import org.apache.spark.sql.hive.test.TestHiveSingleton + +class V1WriteHiveCommandSuite extends V1WriteCommandSuiteBase with TestHiveSingleton { + + test("create hive table as select - no partition column") { + withPlannedWrite { enabled => + withTable("t") { + executeAndCheckOrdering(hasLogicalSort = false, orderingMatched = true) { + sql("CREATE TABLE t AS SELECT * FROM t0") + } + } + } + } + + test("create hive table as select") { + withPlannedWrite { enabled => + withTable("t") { + withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") { + executeAndCheckOrdering(hasLogicalSort = enabled, orderingMatched = enabled) { + sql( + """ + |CREATE TABLE t + |PARTITIONED BY (k) + |AS SELECT * FROM t0 + |""".stripMargin) + } + } + } + } + } + + test("insert into hive table") { + withPlannedWrite { enabled => + withTable("t") { + sql( + """ + |CREATE TABLE t (i INT, j INT) + |PARTITIONED BY (k STRING) + |CLUSTERED BY (i, j) SORTED BY (j) INTO 2 BUCKETS + |""".stripMargin) + withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") { + executeAndCheckOrdering(hasLogicalSort = enabled, orderingMatched = enabled) { + sql("INSERT INTO t SELECT * FROM t0") + } + } + } + } + } + + test("insert overwrite hive table") { + withPlannedWrite { enabled => + withTable("t") { + withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") { + sql( + """ + |CREATE TABLE t + |PARTITIONED BY (k) + |AS SELECT * FROM t0 + |""".stripMargin) + executeAndCheckOrdering(hasLogicalSort = enabled, orderingMatched = enabled) { + sql("INSERT OVERWRITE t SELECT j AS i, i AS j, k FROM t0") + } + } + } + } + } + + test("insert into hive table with static partitions only") { + withPlannedWrite { enabled => + withTable("t") { + sql( + """ + |CREATE TABLE t (i INT, j INT) + |PARTITIONED BY (k STRING) + |""".stripMargin) + // No dynamic partition so no sort is needed. + executeAndCheckOrdering(hasLogicalSort = false, orderingMatched = true) { + sql("INSERT INTO t PARTITION (k='0') SELECT i, j FROM t0 WHERE k = '0'") + } + } + } + } +}