-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-28351][SQL] Support DELETE in DataSource V2 #25115
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
bc06ef6
5c2590c
634f7c7
254c2cf
ba5555c
d30969b
06b12be
5271377
ce751c5
625e154
7e7ddf4
0885490
21b02ea
e68fba2
792c36b
bbf5156
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,43 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.sources.v2; | ||
|
|
||
| import org.apache.spark.sql.sources.Filter; | ||
|
|
||
| /** | ||
| * A mix-in interface for {@link Table} delete support. Data sources can implement this | ||
| * interface to provide the ability to delete data from tables that matches filter expressions. | ||
| */ | ||
| public interface SupportsDelete { | ||
| /** | ||
| * Delete data from a data source table that matches filter expressions. | ||
| * <p> | ||
| * Rows are deleted from the data source iff all of the filter expressions match. That is, the | ||
| * expressions must be interpreted as a set of filters that are ANDed together. | ||
| * <p> | ||
| * Implementations may reject a delete operation if the delete isn't possible without significant | ||
| * effort. For example, partitioned data sources may reject deletes that do not filter by | ||
| * partition columns because the filter may require rewriting files without deleted records. | ||
| * To reject a delete implementations should throw {@link IllegalArgumentException} with a clear | ||
| * error message that identifies which expression was rejected. | ||
| * | ||
| * @param filters filter expressions, used to select rows to delete when all expressions match | ||
| * @throws IllegalArgumentException If the delete is rejected due to required effort | ||
| */ | ||
| void deleteWhere(Filter[] filters); | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,27 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.catalyst.plans.logical.sql | ||
|
|
||
| import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} | ||
| import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan | ||
|
|
||
| case class DeleteFromStatement( | ||
| tableName: Seq[String], | ||
| tableAlias: Option[String], | ||
| condition: Expression) | ||
| extends ParsedStatement |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,10 +25,10 @@ import org.apache.spark.sql.{AnalysisException, SaveMode} | |
| import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier, LookupCatalog, TableCatalog} | ||
| import org.apache.spark.sql.catalog.v2.expressions.Transform | ||
| import org.apache.spark.sql.catalyst.TableIdentifier | ||
| import org.apache.spark.sql.catalyst.analysis.{CastSupport, UnresolvedAttribute} | ||
| import org.apache.spark.sql.catalyst.analysis.{CastSupport, UnresolvedAttribute, UnresolvedRelation} | ||
| import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType, CatalogUtils, UnresolvedCatalogRelation} | ||
| import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, DropTable, LogicalPlan, ReplaceTable, ReplaceTableAsSelect} | ||
| import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement} | ||
| import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, DeleteFromTable, DropTable, Filter, LogicalPlan, ReplaceTable, ReplaceTableAsSelect, SubqueryAlias} | ||
| import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DeleteFromStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement} | ||
| import org.apache.spark.sql.catalyst.rules.Rule | ||
| import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, DescribeColumnCommand, DescribeTableCommand, DropTableCommand} | ||
| import org.apache.spark.sql.execution.datasources.v2.{CatalogTableAsV2, DataSourceV2Relation} | ||
|
|
@@ -173,9 +173,18 @@ case class DataSourceResolution( | |
| // only top-level adds are supported using AlterTableAddColumnsCommand | ||
| AlterTableAddColumnsCommand(table, newColumns.map(convertToStructField)) | ||
|
|
||
| case DeleteFromStatement(AsTableIdentifier(table), tableAlias, condition) => | ||
| throw new AnalysisException( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since this always throws Then users can still call v2 deletes for formats like FYI @brkyvz.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you @rdblue . Removed this case and fallback to
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's worse to move this case from here to https://github.com/apache/spark/pull/25115/files#diff-57b3d87be744b7d79a9beacf8e5e5eb2R657 . If we can't merge these 2 cases into one here, let's keep it as it was.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If I understand correctly, one purpose of removing the first case is we can execute delete on
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think we need to update There is already another rule that loads tables from a catalog, I considered updating that rule and moving the table resolution part into One of the reasons to do this for the insert plans is that those plans don't include the target relation as a child. Instead, those plans have the data to insert as a child node, which means that the unresolved relation won't be visible to the Taking the same approach in this PR would also make this a little cleaner. If
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay, I rolled back the resolve rules for
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can remove this case after #25402, which updates
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Saw the code in #25402 . I think it's the best choice. |
||
| s"Delete from tables is not supported using the legacy / v1 Spark external catalog" + | ||
| s" API. Identifier: $table.") | ||
|
|
||
| case delete: DeleteFromStatement => | ||
| val relation = UnresolvedRelation(delete.tableName) | ||
| val aliased = delete.tableAlias.map(SubqueryAlias(_, relation)).getOrElse(relation) | ||
| DeleteFromTable(aliased, delete.condition) | ||
|
|
||
| case DataSourceV2Relation(CatalogTableAsV2(catalogTable), _, _) => | ||
| UnresolvedCatalogRelation(catalogTable) | ||
|
|
||
| } | ||
|
|
||
| object V1WriteProvider { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,38 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.execution.datasources.v2 | ||
|
|
||
| import org.apache.spark.rdd.RDD | ||
| import org.apache.spark.sql.catalyst.InternalRow | ||
| import org.apache.spark.sql.catalyst.expressions.Attribute | ||
| import org.apache.spark.sql.execution.LeafExecNode | ||
| import org.apache.spark.sql.sources.Filter | ||
| import org.apache.spark.sql.sources.v2.SupportsDelete | ||
| import org.apache.spark.sql.util.CaseInsensitiveStringMap | ||
|
|
||
| case class DeleteFromTableExec( | ||
| table: SupportsDelete, | ||
| condition: Array[Filter]) extends LeafExecNode { | ||
|
|
||
| override protected def doExecute(): RDD[InternalRow] = { | ||
| table.deleteWhere(condition) | ||
| sparkContext.emptyRDD | ||
| } | ||
|
|
||
| override def output: Seq[Attribute] = Nil | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1862,6 +1862,42 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAn | |
| } | ||
| } | ||
|
|
||
| test("DeleteFrom: basic") { | ||
| val t = "testcat.ns1.ns2.tbl" | ||
| withTable(t) { | ||
| sql(s"CREATE TABLE $t (id bigint, data string, p int) USING foo PARTITIONED BY (id, p)") | ||
| sql(s"INSERT INTO $t VALUES (2L, 'a', 2), (2L, 'b', 3), (3L, 'c', 3)") | ||
| sql(s"DELETE FROM $t WHERE id = 2") | ||
| checkAnswer(spark.table(t), Seq( | ||
| Row(3, "c", 3))) | ||
| } | ||
| } | ||
|
|
||
| test("DeleteFrom: alias") { | ||
| val t = "testcat.ns1.ns2.tbl" | ||
| withTable(t) { | ||
| sql(s"CREATE TABLE $t (id bigint, data string, p int) USING foo PARTITIONED BY (id, p)") | ||
| sql(s"INSERT INTO $t VALUES (2L, 'a', 2), (2L, 'b', 3), (3L, 'c', 3)") | ||
| sql(s"DELETE FROM $t tbl WHERE tbl.id = 2") | ||
| checkAnswer(spark.table(t), Seq( | ||
| Row(3, "c", 3))) | ||
| } | ||
| } | ||
|
|
||
| test("DeleteFrom: fail if has subquery") { | ||
| val t = "testcat.ns1.ns2.tbl" | ||
| withTable(t) { | ||
| sql(s"CREATE TABLE $t (id bigint, data string, p int) USING foo PARTITIONED BY (id, p)") | ||
| sql(s"INSERT INTO $t VALUES (2L, 'a', 2), (2L, 'b', 3), (3L, 'c', 3)") | ||
| val exc = intercept[AnalysisException] { | ||
| sql(s"DELETE FROM $t WHERE id IN (SELECT id FROM $t)") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we also test correlated subquery?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is that necessary to test correlated subquery? Because correlated subquery is a subset of subquery and we forbid subquery here, then correlated subquery is also forbidden.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This sounds reasonable to me. |
||
| } | ||
|
|
||
| assert(spark.table(t).count === 3) | ||
| assert(exc.getMessage.contains("Delete by condition with subquery is not supported")) | ||
| } | ||
| } | ||
|
|
||
| private def testCreateAnalysisError(sqlStatement: String, expectedError: String): Unit = { | ||
| val errMsg = intercept[AnalysisException] { | ||
| sql(sqlStatement) | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.