Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ statement
| SET ROLE .*? #failNativeCommand
| SET .*? #setConfiguration
| RESET #resetConfiguration
| DELETE FROM multipartIdentifier tableAlias whereClause #deleteFromTable
| DELETE FROM multipartIdentifier tableAlias whereClause? #deleteFromTable
| unsupportedHiveNativeCommands .*? #failNativeCommand
;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,12 +343,22 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
val tableId = visitMultipartIdentifier(ctx.multipartIdentifier)
val tableAlias = if (ctx.tableAlias() != null) {
val ident = ctx.tableAlias().strictIdentifier()
if (ident != null) { Some(ident.getText) } else { None }
// We do not allow columns aliases after table alias.
if (ctx.tableAlias().identifierList() != null) {
throw new ParseException("Columns aliases is not allowed in DELETE.",
ctx.tableAlias().identifierList())
}
if (ident != null) Some(ident.getText) else None
} else {
None
}
val predicate = if (ctx.whereClause() != null) {
Some(expression(ctx.whereClause().booleanExpression()))
} else {
None
}

DeleteFromStatement(tableId, tableAlias, expression(ctx.whereClause().booleanExpression()))
DeleteFromStatement(tableId, tableAlias, predicate)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ case class DescribeTable(table: NamedRelation, isExtended: Boolean) extends Comm

case class DeleteFromTable(
child: LogicalPlan,
condition: Expression) extends Command {
condition: Option[Expression]) extends Command {

override def children: Seq[LogicalPlan] = child :: Nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
case class DeleteFromStatement(
tableName: Seq[String],
tableAlias: Option[String],
condition: Expression)
condition: Option[Expression])
extends ParsedStatement
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ package org.apache.spark.sql.catalyst.parser
import java.util.Locale

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, UnresolvedAttribute, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions.{EqualTo, Literal}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, InsertIntoStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement, ShowNamespacesStatement, ShowTablesStatement}
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DeleteFromStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, InsertIntoStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement, ShowNamespacesStatement, ShowTablesStatement}
import org.apache.spark.sql.connector.expressions.{ApplyTransform, BucketTransform, DaysTransform, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, YearsTransform}
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType, TimestampType}
import org.apache.spark.unsafe.types.UTF8String
Expand Down Expand Up @@ -764,6 +765,30 @@ class DDLParserSuite extends AnalysisTest {
assert(exc.getMessage.contains("INSERT INTO ... IF NOT EXISTS"))
}

test("delete from table: delete all") {
parseCompare("DELETE FROM testcat.ns1.ns2.tbl",
DeleteFromStatement(
Seq("testcat", "ns1", "ns2", "tbl"),
None,
None))
}

test("delete from table: with alias and where clause") {
parseCompare("DELETE FROM testcat.ns1.ns2.tbl AS t WHERE t.a = 2",
DeleteFromStatement(
Seq("testcat", "ns1", "ns2", "tbl"),
Some("t"),
Some(EqualTo(UnresolvedAttribute("t.a"), Literal(2)))))
}

test("delete from table: columns aliases is not allowed") {
val exc = intercept[ParseException] {
parsePlan("DELETE FROM testcat.ns1.ns2.tbl AS t(a,b,c,d) WHERE d = 2")
}

assert(exc.getMessage.contains("Columns aliases is not allowed in DELETE."))
}

test("show tables") {
comparePlans(
parsePlan("SHOW TABLES"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,16 +251,17 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
OverwritePartitionsDynamicExec(r.table.asWritable, r.options, planLater(query)) :: Nil

case DeleteFromTable(r: DataSourceV2Relation, condition) =>
if (SubqueryExpression.hasSubquery(condition)) {
if (condition.exists(SubqueryExpression.hasSubquery)) {
throw new AnalysisException(
s"Delete by condition with subquery is not supported: $condition")
}
// fail if any filter cannot be converted. correctness depends on removing all matching data.
val filters = splitConjunctivePredicates(condition).map {
f => DataSourceStrategy.translateFilter(f).getOrElse(
throw new AnalysisException(s"Exec delete failed:" +
s" cannot translate expression to source filter: $f"))
}.toArray
val filters = DataSourceStrategy.normalizeFilters(condition.toSeq, r.output)
.flatMap(splitConjunctivePredicates(_).map {
f => DataSourceStrategy.translateFilter(f).getOrElse(
throw new AnalysisException(s"Exec update failed:" +
s" cannot translate expression to source filter: $f"))
}).toArray
DeleteFromTableExec(r.table.asDeletable, filters) :: Nil

case WriteToContinuousDataSource(writer, query) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -992,7 +992,17 @@ class DataSourceV2SQLSuite
}
}

test("DeleteFrom: basic") {
test("DeleteFrom: basic - delete all") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
sql(s"CREATE TABLE $t (id bigint, data string, p int) USING foo PARTITIONED BY (id, p)")
sql(s"INSERT INTO $t VALUES (2L, 'a', 2), (2L, 'b', 3), (3L, 'c', 3)")
sql(s"DELETE FROM $t")
checkAnswer(spark.table(t), Seq())
}
}

test("DeleteFrom: basic - delete with where clause") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
sql(s"CREATE TABLE $t (id bigint, data string, p int) USING foo PARTITIONED BY (id, p)")
Expand All @@ -1003,12 +1013,23 @@ class DataSourceV2SQLSuite
}
}

test("DeleteFrom: alias") {
test("DeleteFrom: delete from aliased target table") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
sql(s"CREATE TABLE $t (id bigint, data string, p int) USING foo PARTITIONED BY (id, p)")
sql(s"INSERT INTO $t VALUES (2L, 'a', 2), (2L, 'b', 3), (3L, 'c', 3)")
sql(s"DELETE FROM $t AS tbl WHERE tbl.id = 2")
checkAnswer(spark.table(t), Seq(
Row(3, "c", 3)))
}
}

test("DeleteFrom: normalize attribute names") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
sql(s"CREATE TABLE $t (id bigint, data string, p int) USING foo PARTITIONED BY (id, p)")
sql(s"INSERT INTO $t VALUES (2L, 'a', 2), (2L, 'b', 3), (3L, 'c', 3)")
sql(s"DELETE FROM $t tbl WHERE tbl.id = 2")
sql(s"DELETE FROM $t AS tbl WHERE tbl.ID = 2")
checkAnswer(spark.table(t), Seq(
Row(3, "c", 3)))
}
Expand Down