diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index 2d1964f6a217..4436c6b24f7c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -24,8 +24,8 @@ import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.{InternalRow, ProjectingInternalRow} import org.apache.spark.sql.catalyst.analysis.NoSuchTableException -import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, TableSpec, UnaryNode} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Literal} +import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, OverwriteByExpression, TableSpec, UnaryNode} import org.apache.spark.sql.catalyst.util.{removeInternalMetadata, CharVarcharUtils, ReplaceDataProjections, WriteDeltaProjections} import org.apache.spark.sql.catalyst.util.RowDeltaUtils.{DELETE_OPERATION, INSERT_OPERATION, REINSERT_OPERATION, UPDATE_OPERATION, WRITE_OPERATION, WRITE_WITH_METADATA_OPERATION} import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, Identifier, StagedTable, StagingTableCatalog, Table, TableCatalog, TableInfo, TableWritePrivilege} @@ -89,7 +89,7 @@ case class CreateTableAsSelectExec( .build() val table = Option(catalog.createTable(ident, tableInfo)) .getOrElse(catalog.loadTable(ident, Set(TableWritePrivilege.INSERT).asJava)) - writeToTable(catalog, table, writeOptions, ident, query) + writeToTable(catalog, table, writeOptions, ident, query, overwrite = false) } } @@ -130,7 +130,7 @@ case class AtomicCreateTableAsSelectExec( .build() val stagedTable = Option(catalog.stageCreate(ident, tableInfo) ).getOrElse(catalog.loadTable(ident, Set(TableWritePrivilege.INSERT).asJava)) - writeToTable(catalog, stagedTable, writeOptions, ident, query) + writeToTable(catalog, stagedTable, writeOptions, ident, query, overwrite = false) } } @@ -180,7 +180,7 @@ case class ReplaceTableAsSelectExec( .build() val table = Option(catalog.createTable(ident, tableInfo)) .getOrElse(catalog.loadTable(ident, Set(TableWritePrivilege.INSERT).asJava)) - writeToTable(catalog, table, writeOptions, ident, query) + writeToTable(catalog, table, writeOptions, ident, query, overwrite = true) } } @@ -242,7 +242,7 @@ case class AtomicReplaceTableAsSelectExec( } val table = Option(staged).getOrElse( catalog.loadTable(ident, Set(TableWritePrivilege.INSERT).asJava)) - writeToTable(catalog, table, writeOptions, ident, query) + writeToTable(catalog, table, writeOptions, ident, query, overwrite = true) } } @@ -697,15 +697,18 @@ private[v2] trait V2CreateTableAsSelectBaseExec extends LeafV2CommandExec { table: Table, writeOptions: Map[String, String], ident: Identifier, - query: LogicalPlan): Seq[InternalRow] = { + query: LogicalPlan, + overwrite: Boolean): Seq[InternalRow] = { Utils.tryWithSafeFinallyAndFailureCallbacks({ val relation = DataSourceV2Relation.create(table, Some(catalog), Some(ident)) - val append = AppendData.byPosition(relation, query, writeOptions) - val qe = session.sessionState.executePlan(append) + val writeCommand = if (overwrite) { + OverwriteByExpression.byPosition(relation, query, Literal.TrueLiteral, writeOptions) + } else { + AppendData.byPosition(relation, query, writeOptions) + } + val qe = session.sessionState.executePlan(writeCommand) qe.assertCommandExecuted() - DataSourceV2Utils.commitStagedChanges(sparkContext, table, metrics) - Nil })(catchBlock = { table match { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala index f5ca885b1ad6..3eeed2e41754 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala @@ -819,7 +819,7 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS |OPTIONS (PATH '$path') |AS VALUES (2, 3) |""".stripMargin) - checkAnswer(sql("SELECT * FROM test"), Seq(Row(0, 1), Row(0, 1), Row(1, 2), Row(2, 3))) + checkAnswer(sql("SELECT * FROM test"), Seq(Row(2, 3))) // Replace the table without the path options. sql( s"""