diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 70c0d0e505f6c..20be8e539cf58 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -216,7 +216,7 @@ statement | SET ROLE .*? #failNativeCommand | SET .*? #setConfiguration | RESET #resetConfiguration - | DELETE FROM multipartIdentifier tableAlias whereClause #deleteFromTable + | DELETE FROM multipartIdentifier tableAlias whereClause? #deleteFromTable | unsupportedHiveNativeCommands .*? #failNativeCommand ; diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 8556ff4775530..9335be5b239b6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -343,12 +343,22 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging val tableId = visitMultipartIdentifier(ctx.multipartIdentifier) val tableAlias = if (ctx.tableAlias() != null) { val ident = ctx.tableAlias().strictIdentifier() - if (ident != null) { Some(ident.getText) } else { None } + // We do not allow columns aliases after table alias. + if (ctx.tableAlias().identifierList() != null) { + throw new ParseException("Columns aliases is not allowed in DELETE.", + ctx.tableAlias().identifierList()) + } + if (ident != null) Some(ident.getText) else None + } else { + None + } + val predicate = if (ctx.whereClause() != null) { + Some(expression(ctx.whereClause().booleanExpression())) } else { None } - DeleteFromStatement(tableId, tableAlias, expression(ctx.whereClause().booleanExpression())) + DeleteFromStatement(tableId, tableAlias, predicate) } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 017f31760b584..2b6378e7c7268 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -579,7 +579,7 @@ case class DescribeTable(table: NamedRelation, isExtended: Boolean) extends Comm case class DeleteFromTable( child: LogicalPlan, - condition: Expression) extends Command { + condition: Option[Expression]) extends Command { override def children: Seq[LogicalPlan] = child :: Nil } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/DeleteFromStatement.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/DeleteFromStatement.scala index 21e24127eee31..035bc09d31871 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/DeleteFromStatement.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/DeleteFromStatement.scala @@ -23,5 +23,5 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan case class DeleteFromStatement( tableName: Seq[String], tableAlias: Option[String], - condition: Expression) + condition: Option[Expression]) extends ParsedStatement diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 2cb01d4e8aa5c..be781081b0112 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -20,10 +20,11 @@ package org.apache.spark.sql.catalyst.parser import java.util.Locale import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, UnresolvedRelation, UnresolvedStar} +import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, UnresolvedAttribute, UnresolvedRelation, UnresolvedStar} import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.expressions.{EqualTo, Literal} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} -import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, InsertIntoStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement, ShowNamespacesStatement, ShowTablesStatement} +import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DeleteFromStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, InsertIntoStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement, ShowNamespacesStatement, ShowTablesStatement} import org.apache.spark.sql.connector.expressions.{ApplyTransform, BucketTransform, DaysTransform, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, YearsTransform} import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType, TimestampType} import org.apache.spark.unsafe.types.UTF8String @@ -764,6 +765,30 @@ class DDLParserSuite extends AnalysisTest { assert(exc.getMessage.contains("INSERT INTO ... IF NOT EXISTS")) } + test("delete from table: delete all") { + parseCompare("DELETE FROM testcat.ns1.ns2.tbl", + DeleteFromStatement( + Seq("testcat", "ns1", "ns2", "tbl"), + None, + None)) + } + + test("delete from table: with alias and where clause") { + parseCompare("DELETE FROM testcat.ns1.ns2.tbl AS t WHERE t.a = 2", + DeleteFromStatement( + Seq("testcat", "ns1", "ns2", "tbl"), + Some("t"), + Some(EqualTo(UnresolvedAttribute("t.a"), Literal(2))))) + } + + test("delete from table: columns aliases is not allowed") { + val exc = intercept[ParseException] { + parsePlan("DELETE FROM testcat.ns1.ns2.tbl AS t(a,b,c,d) WHERE d = 2") + } + + assert(exc.getMessage.contains("Columns aliases is not allowed in DELETE.")) + } + test("show tables") { comparePlans( parsePlan("SHOW TABLES"), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 22100c7d3d593..6285159dad9b0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -251,16 +251,17 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper { OverwritePartitionsDynamicExec(r.table.asWritable, r.options, planLater(query)) :: Nil case DeleteFromTable(r: DataSourceV2Relation, condition) => - if (SubqueryExpression.hasSubquery(condition)) { + if (condition.exists(SubqueryExpression.hasSubquery)) { throw new AnalysisException( s"Delete by condition with subquery is not supported: $condition") } // fail if any filter cannot be converted. correctness depends on removing all matching data. - val filters = splitConjunctivePredicates(condition).map { - f => DataSourceStrategy.translateFilter(f).getOrElse( - throw new AnalysisException(s"Exec delete failed:" + - s" cannot translate expression to source filter: $f")) - }.toArray + val filters = DataSourceStrategy.normalizeFilters(condition.toSeq, r.output) + .flatMap(splitConjunctivePredicates(_).map { + f => DataSourceStrategy.translateFilter(f).getOrElse( + throw new AnalysisException(s"Exec update failed:" + + s" cannot translate expression to source filter: $f")) + }).toArray DeleteFromTableExec(r.table.asDeletable, filters) :: Nil case WriteToContinuousDataSource(writer, query) => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index ef484ce6ac362..9eb8e5b3b73fc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -992,7 +992,17 @@ class DataSourceV2SQLSuite } } - test("DeleteFrom: basic") { + test("DeleteFrom: basic - delete all") { + val t = "testcat.ns1.ns2.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id bigint, data string, p int) USING foo PARTITIONED BY (id, p)") + sql(s"INSERT INTO $t VALUES (2L, 'a', 2), (2L, 'b', 3), (3L, 'c', 3)") + sql(s"DELETE FROM $t") + checkAnswer(spark.table(t), Seq()) + } + } + + test("DeleteFrom: basic - delete with where clause") { val t = "testcat.ns1.ns2.tbl" withTable(t) { sql(s"CREATE TABLE $t (id bigint, data string, p int) USING foo PARTITIONED BY (id, p)") @@ -1003,12 +1013,23 @@ class DataSourceV2SQLSuite } } - test("DeleteFrom: alias") { + test("DeleteFrom: delete from aliased target table") { + val t = "testcat.ns1.ns2.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id bigint, data string, p int) USING foo PARTITIONED BY (id, p)") + sql(s"INSERT INTO $t VALUES (2L, 'a', 2), (2L, 'b', 3), (3L, 'c', 3)") + sql(s"DELETE FROM $t AS tbl WHERE tbl.id = 2") + checkAnswer(spark.table(t), Seq( + Row(3, "c", 3))) + } + } + + test("DeleteFrom: normalize attribute names") { val t = "testcat.ns1.ns2.tbl" withTable(t) { sql(s"CREATE TABLE $t (id bigint, data string, p int) USING foo PARTITIONED BY (id, p)") sql(s"INSERT INTO $t VALUES (2L, 'a', 2), (2L, 'b', 3), (3L, 'c', 3)") - sql(s"DELETE FROM $t tbl WHERE tbl.id = 2") + sql(s"DELETE FROM $t AS tbl WHERE tbl.ID = 2") checkAnswer(spark.table(t), Seq( Row(3, "c", 3))) }