Skip to content

Commit 2eeb25e

Browse files
xy_xincloud-fan
authored andcommitted
[SPARK-28351][SQL] Support DELETE in DataSource V2
## What changes were proposed in this pull request? This pr adds DELETE support for V2 datasources. As a first step, this pr only support delete by source filters: ```scala void delete(Filter[] filters); ``` which could not deal with complicated cases like subqueries. Since it's uncomfortable to embed the implementation of DELETE in the current V2 APIs, a new mix-in of datasource is added, which is called `SupportsMaintenance`, similar to `SupportsRead` and `SupportsWrite`. A datasource which can be maintained means we can perform DELETE/UPDATE/MERGE/OPTIMIZE on the datasource, as long as the datasource implements the necessary mix-ins. ## How was this patch tested? new test case. Please review https://spark.apache.org/contributing.html before opening a pull request. Closes #25115 from xianyinxin/SPARK-28351. Authored-by: xy_xin <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 391c7e8 commit 2eeb25e

File tree

15 files changed

+252
-35
lines changed

15 files changed

+252
-35
lines changed

sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,7 @@ statement
214214
| SET ROLE .*? #failNativeCommand
215215
| SET .*? #setConfiguration
216216
| RESET #resetConfiguration
217+
| DELETE FROM multipartIdentifier tableAlias whereClause #deleteFromTable
217218
| unsupportedHiveNativeCommands .*? #failNativeCommand
218219
;
219220

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.sources.v2;
19+
20+
import org.apache.spark.sql.sources.Filter;
21+
22+
/**
23+
* A mix-in interface for {@link Table} delete support. Data sources can implement this
24+
* interface to provide the ability to delete data from tables that matches filter expressions.
25+
*/
26+
public interface SupportsDelete {
27+
/**
28+
* Delete data from a data source table that matches filter expressions.
29+
* <p>
30+
* Rows are deleted from the data source iff all of the filter expressions match. That is, the
31+
* expressions must be interpreted as a set of filters that are ANDed together.
32+
* <p>
33+
* Implementations may reject a delete operation if the delete isn't possible without significant
34+
* effort. For example, partitioned data sources may reject deletes that do not filter by
35+
* partition columns because the filter may require rewriting files without deleted records.
36+
* To reject a delete implementations should throw {@link IllegalArgumentException} with a clear
37+
* error message that identifies which expression was rejected.
38+
*
39+
* @param filters filter expressions, used to select rows to delete when all expressions match
40+
* @throws IllegalArgumentException If the delete is rejected due to required effort
41+
*/
42+
void deleteWhere(Filter[] filters);
43+
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1761,6 +1761,8 @@ class Analyzer(
17611761
// Only a few unary nodes (Project/Filter/Aggregate) can contain subqueries.
17621762
case q: UnaryNode if q.childrenResolved =>
17631763
resolveSubQueries(q, q.children)
1764+
case d: DeleteFromTable if d.childrenResolved =>
1765+
resolveSubQueries(d, d.children)
17641766
}
17651767
}
17661768

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -585,7 +585,7 @@ trait CheckAnalysis extends PredicateHelper {
585585
// Only certain operators are allowed to host subquery expression containing
586586
// outer references.
587587
plan match {
588-
case _: Filter | _: Aggregate | _: Project => // Ok
588+
case _: Filter | _: Aggregate | _: Project | _: DeleteFromTable => // Ok
589589
case other => failAnalysis(
590590
"Correlated scalar sub-queries can only be used in a " +
591591
s"Filter/Aggregate/Project: $plan")
@@ -594,9 +594,10 @@ trait CheckAnalysis extends PredicateHelper {
594594

595595
case inSubqueryOrExistsSubquery =>
596596
plan match {
597-
case _: Filter => // Ok
597+
case _: Filter | _: DeleteFromTable => // Ok
598598
case _ =>
599-
failAnalysis(s"IN/EXISTS predicate sub-queries can only be used in a Filter: $plan")
599+
failAnalysis(s"IN/EXISTS predicate sub-queries can only be used in" +
600+
s" Filter/DeleteFromTable: $plan")
600601
}
601602
}
602603

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.{First, Last}
3838
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
3939
import org.apache.spark.sql.catalyst.plans._
4040
import org.apache.spark.sql.catalyst.plans.logical._
41-
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, InsertIntoStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement}
41+
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DeleteFromStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, InsertIntoStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement}
4242
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId, stringToDate, stringToTimestamp}
4343
import org.apache.spark.sql.internal.SQLConf
4444
import org.apache.spark.sql.types._
@@ -338,6 +338,20 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
338338
throw new ParseException("INSERT OVERWRITE DIRECTORY is not supported", ctx)
339339
}
340340

341+
override def visitDeleteFromTable(
342+
ctx: DeleteFromTableContext): LogicalPlan = withOrigin(ctx) {
343+
344+
val tableId = visitMultipartIdentifier(ctx.multipartIdentifier)
345+
val tableAlias = if (ctx.tableAlias() != null) {
346+
val ident = ctx.tableAlias().strictIdentifier()
347+
if (ident != null) { Some(ident.getText) } else { None }
348+
} else {
349+
None
350+
}
351+
352+
DeleteFromStatement(tableId, tableAlias, expression(ctx.whereClause().booleanExpression()))
353+
}
354+
341355
/**
342356
* Create a partition specification map.
343357
*/

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -567,6 +567,13 @@ case class DescribeTable(table: NamedRelation, isExtended: Boolean) extends Comm
567567
override val output = DescribeTableSchema.describeTableAttributes()
568568
}
569569

570+
case class DeleteFromTable(
571+
child: LogicalPlan,
572+
condition: Expression) extends Command {
573+
574+
override def children: Seq[LogicalPlan] = child :: Nil
575+
}
576+
570577
/**
571578
* Drop a table.
572579
*/
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.plans.logical.sql
19+
20+
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
21+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
22+
23+
case class DeleteFromStatement(
24+
tableName: Seq[String],
25+
tableAlias: Option[String],
26+
condition: Expression)
27+
extends ParsedStatement

sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.sql.execution.datasources.v2
1919

2020
import org.apache.spark.sql.AnalysisException
21-
import org.apache.spark.sql.sources.v2.{SupportsRead, SupportsWrite, Table, TableCapability}
21+
import org.apache.spark.sql.sources.v2.{SupportsDelete, SupportsRead, SupportsWrite, Table, TableCapability}
2222

2323
object DataSourceV2Implicits {
2424
implicit class TableHelper(table: Table) {
@@ -40,6 +40,15 @@ object DataSourceV2Implicits {
4040
}
4141
}
4242

43+
def asDeletable: SupportsDelete = {
44+
table match {
45+
case support: SupportsDelete =>
46+
support
47+
case _ =>
48+
throw new AnalysisException(s"Table does not support deletes: ${table.name}")
49+
}
50+
}
51+
4352
def supports(capability: TableCapability): Boolean = table.capabilities.contains(capability)
4453

4554
def supportsAny(capabilities: TableCapability*): Boolean = capabilities.exists(supports)

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -531,7 +531,8 @@ class AnalysisErrorSuite extends AnalysisTest {
531531
val plan = Project(
532532
Seq(a, Alias(InSubquery(Seq(a), ListQuery(LocalRelation(b))), "c")()),
533533
LocalRelation(a))
534-
assertAnalysisError(plan, "Predicate sub-queries can only be used in a Filter" :: Nil)
534+
assertAnalysisError(plan, "Predicate sub-queries can only be used" +
535+
" in Filter/DeleteFromTable" :: Nil)
535536
}
536537

537538
test("PredicateSubQuery is used is a nested condition") {

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,10 @@ import org.apache.spark.sql.{AnalysisException, SaveMode}
2525
import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier, LookupCatalog, TableCatalog}
2626
import org.apache.spark.sql.catalog.v2.expressions.Transform
2727
import org.apache.spark.sql.catalyst.TableIdentifier
28-
import org.apache.spark.sql.catalyst.analysis.{CastSupport, UnresolvedAttribute}
28+
import org.apache.spark.sql.catalyst.analysis.{CastSupport, UnresolvedAttribute, UnresolvedRelation}
2929
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType, CatalogUtils, UnresolvedCatalogRelation}
30-
import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, DropTable, LogicalPlan, ReplaceTable, ReplaceTableAsSelect}
31-
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement}
30+
import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, DeleteFromTable, DropTable, Filter, LogicalPlan, ReplaceTable, ReplaceTableAsSelect, SubqueryAlias}
31+
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DeleteFromStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement}
3232
import org.apache.spark.sql.catalyst.rules.Rule
3333
import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, DescribeColumnCommand, DescribeTableCommand, DropTableCommand}
3434
import org.apache.spark.sql.execution.datasources.v2.{CatalogTableAsV2, DataSourceV2Relation}
@@ -173,9 +173,18 @@ case class DataSourceResolution(
173173
// only top-level adds are supported using AlterTableAddColumnsCommand
174174
AlterTableAddColumnsCommand(table, newColumns.map(convertToStructField))
175175

176+
case DeleteFromStatement(AsTableIdentifier(table), tableAlias, condition) =>
177+
throw new AnalysisException(
178+
s"Delete from tables is not supported using the legacy / v1 Spark external catalog" +
179+
s" API. Identifier: $table.")
180+
181+
case delete: DeleteFromStatement =>
182+
val relation = UnresolvedRelation(delete.tableName)
183+
val aliased = delete.tableAlias.map(SubqueryAlias(_, relation)).getOrElse(relation)
184+
DeleteFromTable(aliased, delete.condition)
185+
176186
case DataSourceV2Relation(CatalogTableAsV2(catalogTable), _, _) =>
177187
UnresolvedCatalogRelation(catalogTable)
178-
179188
}
180189

181190
object V1WriteProvider {

0 commit comments

Comments
 (0)