Skip to content

Commit 3fc52b5

Browse files
xy_xincloud-fan
authored andcommitted
[SPARK-28950][SQL] Refine the code of DELETE
### What changes were proposed in this pull request? This pr refines the code of DELETE, including, 1, make `whereClause` to be optional, in which case DELETE will delete all of the data of a table; 2, add more test cases; 3, some other refines. This is a following-up of SPARK-28351. ### Why are the changes needed? An optional where clause in DELETE respects the SQL standard. ### Does this PR introduce any user-facing change? Yes. But since this is a non-released feature, this change does not have any end-user affects. ### How was this patch tested? New case is added. Closes #25652 from xianyinxin/SPARK-28950. Authored-by: xy_xin <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 34915b2 commit 3fc52b5

File tree

7 files changed

+73
-16
lines changed

7 files changed

+73
-16
lines changed

sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ statement
216216
| SET ROLE .*? #failNativeCommand
217217
| SET .*? #setConfiguration
218218
| RESET #resetConfiguration
219-
| DELETE FROM multipartIdentifier tableAlias whereClause #deleteFromTable
219+
| DELETE FROM multipartIdentifier tableAlias whereClause? #deleteFromTable
220220
| unsupportedHiveNativeCommands .*? #failNativeCommand
221221
;
222222

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -343,12 +343,22 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
343343
val tableId = visitMultipartIdentifier(ctx.multipartIdentifier)
344344
val tableAlias = if (ctx.tableAlias() != null) {
345345
val ident = ctx.tableAlias().strictIdentifier()
346-
if (ident != null) { Some(ident.getText) } else { None }
346+
// We do not allow columns aliases after table alias.
347+
if (ctx.tableAlias().identifierList() != null) {
348+
throw new ParseException("Columns aliases is not allowed in DELETE.",
349+
ctx.tableAlias().identifierList())
350+
}
351+
if (ident != null) Some(ident.getText) else None
352+
} else {
353+
None
354+
}
355+
val predicate = if (ctx.whereClause() != null) {
356+
Some(expression(ctx.whereClause().booleanExpression()))
347357
} else {
348358
None
349359
}
350360

351-
DeleteFromStatement(tableId, tableAlias, expression(ctx.whereClause().booleanExpression()))
361+
DeleteFromStatement(tableId, tableAlias, predicate)
352362
}
353363

354364
/**

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -579,7 +579,7 @@ case class DescribeTable(table: NamedRelation, isExtended: Boolean) extends Comm
579579

580580
case class DeleteFromTable(
581581
child: LogicalPlan,
582-
condition: Expression) extends Command {
582+
condition: Option[Expression]) extends Command {
583583

584584
override def children: Seq[LogicalPlan] = child :: Nil
585585
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/DeleteFromStatement.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,5 +23,5 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
2323
case class DeleteFromStatement(
2424
tableName: Seq[String],
2525
tableAlias: Option[String],
26-
condition: Expression)
26+
condition: Option[Expression])
2727
extends ParsedStatement

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,11 @@ package org.apache.spark.sql.catalyst.parser
2020
import java.util.Locale
2121

2222
import org.apache.spark.sql.AnalysisException
23-
import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, UnresolvedRelation, UnresolvedStar}
23+
import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, UnresolvedAttribute, UnresolvedRelation, UnresolvedStar}
2424
import org.apache.spark.sql.catalyst.catalog.BucketSpec
25+
import org.apache.spark.sql.catalyst.expressions.{EqualTo, Literal}
2526
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
26-
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, InsertIntoStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement, ShowNamespacesStatement, ShowTablesStatement}
27+
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DeleteFromStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, InsertIntoStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement, ShowNamespacesStatement, ShowTablesStatement}
2728
import org.apache.spark.sql.connector.expressions.{ApplyTransform, BucketTransform, DaysTransform, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, YearsTransform}
2829
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType, TimestampType}
2930
import org.apache.spark.unsafe.types.UTF8String
@@ -764,6 +765,30 @@ class DDLParserSuite extends AnalysisTest {
764765
assert(exc.getMessage.contains("INSERT INTO ... IF NOT EXISTS"))
765766
}
766767

768+
test("delete from table: delete all") {
769+
parseCompare("DELETE FROM testcat.ns1.ns2.tbl",
770+
DeleteFromStatement(
771+
Seq("testcat", "ns1", "ns2", "tbl"),
772+
None,
773+
None))
774+
}
775+
776+
test("delete from table: with alias and where clause") {
777+
parseCompare("DELETE FROM testcat.ns1.ns2.tbl AS t WHERE t.a = 2",
778+
DeleteFromStatement(
779+
Seq("testcat", "ns1", "ns2", "tbl"),
780+
Some("t"),
781+
Some(EqualTo(UnresolvedAttribute("t.a"), Literal(2)))))
782+
}
783+
784+
test("delete from table: columns aliases is not allowed") {
785+
val exc = intercept[ParseException] {
786+
parsePlan("DELETE FROM testcat.ns1.ns2.tbl AS t(a,b,c,d) WHERE d = 2")
787+
}
788+
789+
assert(exc.getMessage.contains("Columns aliases is not allowed in DELETE."))
790+
}
791+
767792
test("show tables") {
768793
comparePlans(
769794
parsePlan("SHOW TABLES"),

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -251,16 +251,17 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
251251
OverwritePartitionsDynamicExec(r.table.asWritable, r.options, planLater(query)) :: Nil
252252

253253
case DeleteFromTable(r: DataSourceV2Relation, condition) =>
254-
if (SubqueryExpression.hasSubquery(condition)) {
254+
if (condition.exists(SubqueryExpression.hasSubquery)) {
255255
throw new AnalysisException(
256256
s"Delete by condition with subquery is not supported: $condition")
257257
}
258258
// fail if any filter cannot be converted. correctness depends on removing all matching data.
259-
val filters = splitConjunctivePredicates(condition).map {
260-
f => DataSourceStrategy.translateFilter(f).getOrElse(
261-
throw new AnalysisException(s"Exec delete failed:" +
262-
s" cannot translate expression to source filter: $f"))
263-
}.toArray
259+
val filters = DataSourceStrategy.normalizeFilters(condition.toSeq, r.output)
260+
.flatMap(splitConjunctivePredicates(_).map {
261+
f => DataSourceStrategy.translateFilter(f).getOrElse(
262+
throw new AnalysisException(s"Exec update failed:" +
263+
s" cannot translate expression to source filter: $f"))
264+
}).toArray
264265
DeleteFromTableExec(r.table.asDeletable, filters) :: Nil
265266

266267
case WriteToContinuousDataSource(writer, query) =>

sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -992,7 +992,17 @@ class DataSourceV2SQLSuite
992992
}
993993
}
994994

995-
test("DeleteFrom: basic") {
995+
test("DeleteFrom: basic - delete all") {
996+
val t = "testcat.ns1.ns2.tbl"
997+
withTable(t) {
998+
sql(s"CREATE TABLE $t (id bigint, data string, p int) USING foo PARTITIONED BY (id, p)")
999+
sql(s"INSERT INTO $t VALUES (2L, 'a', 2), (2L, 'b', 3), (3L, 'c', 3)")
1000+
sql(s"DELETE FROM $t")
1001+
checkAnswer(spark.table(t), Seq())
1002+
}
1003+
}
1004+
1005+
test("DeleteFrom: basic - delete with where clause") {
9961006
val t = "testcat.ns1.ns2.tbl"
9971007
withTable(t) {
9981008
sql(s"CREATE TABLE $t (id bigint, data string, p int) USING foo PARTITIONED BY (id, p)")
@@ -1003,12 +1013,23 @@ class DataSourceV2SQLSuite
10031013
}
10041014
}
10051015

1006-
test("DeleteFrom: alias") {
1016+
test("DeleteFrom: delete from aliased target table") {
1017+
val t = "testcat.ns1.ns2.tbl"
1018+
withTable(t) {
1019+
sql(s"CREATE TABLE $t (id bigint, data string, p int) USING foo PARTITIONED BY (id, p)")
1020+
sql(s"INSERT INTO $t VALUES (2L, 'a', 2), (2L, 'b', 3), (3L, 'c', 3)")
1021+
sql(s"DELETE FROM $t AS tbl WHERE tbl.id = 2")
1022+
checkAnswer(spark.table(t), Seq(
1023+
Row(3, "c", 3)))
1024+
}
1025+
}
1026+
1027+
test("DeleteFrom: normalize attribute names") {
10071028
val t = "testcat.ns1.ns2.tbl"
10081029
withTable(t) {
10091030
sql(s"CREATE TABLE $t (id bigint, data string, p int) USING foo PARTITIONED BY (id, p)")
10101031
sql(s"INSERT INTO $t VALUES (2L, 'a', 2), (2L, 'b', 3), (3L, 'c', 3)")
1011-
sql(s"DELETE FROM $t tbl WHERE tbl.id = 2")
1032+
sql(s"DELETE FROM $t AS tbl WHERE tbl.ID = 2")
10121033
checkAnswer(spark.table(t), Seq(
10131034
Row(3, "c", 3)))
10141035
}

0 commit comments

Comments
 (0)