Skip to content

Commit 176e92a

Browse files
committed
[SPARK-51936][SQL] ReplaceTableAsSelect should overwrite the new table instead of append
For file source v1, if you do ``` Seq(1 -> "a").toDF().write.option("path", p).saveAsTable("t") Seq(2 -> "b").toDF().write.mode("overwrite").option("path", p).saveAsTable("t") ``` At the end, the data of `t` is `[2, "b"]`, because the v1 command `CreateDataSourceTableAsSelectCommand` uses `Overwrite` mode to write the data to the file directory. With DS v2, we use the v2 command `ReplaceTableAsSelect`, which uses `AppendData` to write to the new table. If the new table still keeps the old data, which can happen for file source tables, as DROP TABLE won't delete the external location, then the behavior will be different from file source v1. This PR fixes this inconsistency by using `OverwriteByExpression` in `ReplaceTableAsSelect` physical commands. Fixes a potential inconsistency issue between file source v1 and v2, for now we are fine as we don't support file source v2 table yet. This is also helpful for third-party v2 sources that may retain old data in the new table. No, file source v2 table is not supported yet. update an existing test no Closes #50739 from cloud-fan/RTAS. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent eb6cc4c commit 176e92a

File tree

1 file changed

+14
-11
lines changed

1 file changed

+14
-11
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ import org.apache.spark.internal.Logging
2424
import org.apache.spark.rdd.RDD
2525
import org.apache.spark.sql.catalyst.InternalRow
2626
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
27-
import org.apache.spark.sql.catalyst.expressions.Attribute
28-
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, TableSpec, UnaryNode}
27+
import org.apache.spark.sql.catalyst.expressions.{Attribute, Literal}
28+
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, OverwriteByExpression, TableSpec, UnaryNode}
2929
import org.apache.spark.sql.catalyst.util.{removeInternalMetadata, CharVarcharUtils, WriteDeltaProjections}
3030
import org.apache.spark.sql.catalyst.util.RowDeltaUtils.{DELETE_OPERATION, INSERT_OPERATION, UPDATE_OPERATION}
3131
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, Identifier, StagedTable, StagingTableCatalog, Table, TableCatalog, TableWritePrivilege}
@@ -85,7 +85,7 @@ case class CreateTableAsSelectExec(
8585
ident, getV2Columns(query.schema, catalog.useNullableQuerySchema),
8686
partitioning.toArray, properties.asJava)
8787
).getOrElse(catalog.loadTable(ident, Set(TableWritePrivilege.INSERT).asJava))
88-
writeToTable(catalog, table, writeOptions, ident, query)
88+
writeToTable(catalog, table, writeOptions, ident, query, overwrite = false)
8989
}
9090
}
9191

@@ -120,7 +120,7 @@ case class AtomicCreateTableAsSelectExec(
120120
ident, getV2Columns(query.schema, catalog.useNullableQuerySchema),
121121
partitioning.toArray, properties.asJava)
122122
).getOrElse(catalog.loadTable(ident, Set(TableWritePrivilege.INSERT).asJava))
123-
writeToTable(catalog, stagedTable, writeOptions, ident, query)
123+
writeToTable(catalog, stagedTable, writeOptions, ident, query, overwrite = false)
124124
}
125125
}
126126

@@ -167,7 +167,7 @@ case class ReplaceTableAsSelectExec(
167167
ident, getV2Columns(query.schema, catalog.useNullableQuerySchema),
168168
partitioning.toArray, properties.asJava)
169169
).getOrElse(catalog.loadTable(ident, Set(TableWritePrivilege.INSERT).asJava))
170-
writeToTable(catalog, table, writeOptions, ident, query)
170+
writeToTable(catalog, table, writeOptions, ident, query, overwrite = true)
171171
}
172172
}
173173

@@ -218,7 +218,7 @@ case class AtomicReplaceTableAsSelectExec(
218218
}
219219
val table = Option(staged).getOrElse(
220220
catalog.loadTable(ident, Set(TableWritePrivilege.INSERT).asJava))
221-
writeToTable(catalog, table, writeOptions, ident, query)
221+
writeToTable(catalog, table, writeOptions, ident, query, overwrite = true)
222222
}
223223
}
224224

@@ -574,18 +574,21 @@ private[v2] trait V2CreateTableAsSelectBaseExec extends LeafV2CommandExec {
574574
table: Table,
575575
writeOptions: Map[String, String],
576576
ident: Identifier,
577-
query: LogicalPlan): Seq[InternalRow] = {
577+
query: LogicalPlan,
578+
overwrite: Boolean): Seq[InternalRow] = {
578579
Utils.tryWithSafeFinallyAndFailureCallbacks({
579580
val relation = DataSourceV2Relation.create(table, Some(catalog), Some(ident))
580-
val append = AppendData.byPosition(relation, query, writeOptions)
581-
val qe = session.sessionState.executePlan(append)
581+
val writeCommand = if (overwrite) {
582+
OverwriteByExpression.byPosition(relation, query, Literal.TrueLiteral, writeOptions)
583+
} else {
584+
AppendData.byPosition(relation, query, writeOptions)
585+
}
586+
val qe = session.sessionState.executePlan(writeCommand)
582587
qe.assertCommandExecuted()
583-
584588
table match {
585589
case st: StagedTable => st.commitStagedChanges()
586590
case _ =>
587591
}
588-
589592
Nil
590593
})(catchBlock = {
591594
table match {

0 commit comments

Comments
 (0)