Skip to content

Commit b05f2c2

Browse files
committed
[SPARK-38085][SQL] DS V2: Handle DELETE commands for group-based sources
1 parent 7a6b989 commit b05f2c2

File tree

23 files changed

+1403
-24
lines changed

23 files changed

+1403
-24
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -319,6 +319,7 @@ class Analyzer(override val catalogManager: CatalogManager)
319319
ResolveRandomSeed ::
320320
ResolveBinaryArithmetic ::
321321
ResolveUnion ::
322+
RewriteDeleteFromTable ::
322323
typeCoercionRules ++
323324
Seq(ResolveWithCTE) ++
324325
extendedResolutionRules : _*),
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.analysis
19+
20+
import org.apache.spark.sql.catalyst.expressions.{EqualNullSafe, Expression, Not}
21+
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
22+
import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, Filter, LogicalPlan, ReplaceData}
23+
import org.apache.spark.sql.connector.catalog.{SupportsDelete, SupportsRowLevelOperations, TruncatableTable}
24+
import org.apache.spark.sql.connector.write.RowLevelOperation.Command.DELETE
25+
import org.apache.spark.sql.connector.write.RowLevelOperationTable
26+
import org.apache.spark.sql.errors.QueryCompilationErrors
27+
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
28+
import org.apache.spark.sql.util.CaseInsensitiveStringMap
29+
30+
/**
31+
* A rule that rewrites DELETE operations using plans that operate on individual or groups of rows.
32+
*
33+
* If a table implements [[SupportsDelete]] and [[SupportsRowLevelOperations]], this rule will
34+
* still rewrite the DELETE operation but the optimizer will check whether this particular DELETE
35+
* statement can be handled by simply passing delete filters to the connector. If so, the optimizer
36+
* will discard the rewritten plan and will allow the data source to delete using filters.
37+
*/
38+
object RewriteDeleteFromTable extends RewriteRowLevelCommand {
39+
40+
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
41+
case d @ DeleteFromTable(aliasedTable, cond) if d.resolved =>
42+
EliminateSubqueryAliases(aliasedTable) match {
43+
case DataSourceV2Relation(_: TruncatableTable, _, _, _, _) if cond == TrueLiteral =>
44+
// don't rewrite as the table supports truncation
45+
d
46+
47+
case r @ DataSourceV2Relation(t: SupportsRowLevelOperations, _, _, _, _) =>
48+
val table = buildOperationTable(t, DELETE, CaseInsensitiveStringMap.empty())
49+
buildReplaceDataPlan(r, table, cond)
50+
51+
case DataSourceV2Relation(_: SupportsDelete, _, _, _, _) =>
52+
// don't rewrite as the table supports deletes only with filters
53+
d
54+
55+
case DataSourceV2Relation(t, _, _, _, _) =>
56+
throw QueryCompilationErrors.tableDoesNotSupportDeletesError(t)
57+
58+
case _ =>
59+
d
60+
}
61+
}
62+
63+
// build a rewrite plan for sources that support replacing groups of data (e.g. files, partitions)
64+
private def buildReplaceDataPlan(
65+
relation: DataSourceV2Relation,
66+
operationTable: RowLevelOperationTable,
67+
cond: Expression): ReplaceData = {
68+
69+
// resolve all required metadata attrs that may be used for grouping data on write
70+
// for instance, JDBC data source may cluster data by shard/host before writing
71+
val metadataAttrs = resolveRequiredMetadataAttrs(relation, operationTable.operation)
72+
73+
// construct a read relation and include all required metadata columns
74+
val readRelation = buildRelationWithAttrs(relation, operationTable, metadataAttrs)
75+
76+
// construct a plan that contains unmatched rows in matched groups that must be carried over
77+
// such rows do not match the condition but have to be copied over as the source can replace
78+
// only groups of rows (e.g. if a source supports replacing files, unmatched rows in matched
79+
// files must be carried over)
80+
// it is safe to negate the condition here as the predicate pushdown for group-based row-level
81+
// operations is handled in a special way
82+
val remainingRowsFilter = Not(EqualNullSafe(cond, TrueLiteral))
83+
val remainingRowsPlan = Filter(remainingRowsFilter, readRelation)
84+
85+
// build a plan to replace read groups in the table
86+
val writeRelation = relation.copy(table = operationTable)
87+
ReplaceData(writeRelation, cond, remainingRowsPlan, relation)
88+
}
89+
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.analysis
19+
20+
import scala.collection.mutable
21+
22+
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, ExprId, V2ExpressionUtils}
23+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
24+
import org.apache.spark.sql.catalyst.rules.Rule
25+
import org.apache.spark.sql.connector.catalog.SupportsRowLevelOperations
26+
import org.apache.spark.sql.connector.write.{RowLevelOperation, RowLevelOperationInfoImpl, RowLevelOperationTable}
27+
import org.apache.spark.sql.connector.write.RowLevelOperation.Command
28+
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
29+
import org.apache.spark.sql.util.CaseInsensitiveStringMap
30+
31+
trait RewriteRowLevelCommand extends Rule[LogicalPlan] {
32+
33+
protected def buildOperationTable(
34+
table: SupportsRowLevelOperations,
35+
command: Command,
36+
options: CaseInsensitiveStringMap): RowLevelOperationTable = {
37+
val info = RowLevelOperationInfoImpl(command, options)
38+
val operation = table.newRowLevelOperationBuilder(info).build()
39+
RowLevelOperationTable(table, operation)
40+
}
41+
42+
protected def buildRelationWithAttrs(
43+
relation: DataSourceV2Relation,
44+
table: RowLevelOperationTable,
45+
metadataAttrs: Seq[AttributeReference]): DataSourceV2Relation = {
46+
47+
val attrs = dedupAttrs(relation.output ++ metadataAttrs)
48+
relation.copy(table = table, output = attrs)
49+
}
50+
51+
protected def dedupAttrs(attrs: Seq[AttributeReference]): Seq[AttributeReference] = {
52+
val exprIds = mutable.Set.empty[ExprId]
53+
attrs.flatMap { attr =>
54+
if (exprIds.contains(attr.exprId)) {
55+
None
56+
} else {
57+
exprIds += attr.exprId
58+
Some(attr)
59+
}
60+
}
61+
}
62+
63+
protected def resolveRequiredMetadataAttrs(
64+
relation: DataSourceV2Relation,
65+
operation: RowLevelOperation): Seq[AttributeReference] = {
66+
67+
V2ExpressionUtils.resolveRefs[AttributeReference](
68+
operation.requiredMetadataAttributes,
69+
relation)
70+
}
71+
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicate.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.optimizer
1919

2020
import org.apache.spark.sql.catalyst.expressions.{And, ArrayExists, ArrayFilter, CaseWhen, EqualNullSafe, Expression, If, In, InSet, LambdaFunction, Literal, MapFilter, Not, Or}
2121
import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral}
22-
import org.apache.spark.sql.catalyst.plans.logical.{DeleteAction, DeleteFromTable, Filter, InsertAction, InsertStarAction, Join, LogicalPlan, MergeAction, MergeIntoTable, UpdateAction, UpdateStarAction, UpdateTable}
22+
import org.apache.spark.sql.catalyst.plans.logical.{DeleteAction, DeleteFromTable, Filter, InsertAction, InsertStarAction, Join, LogicalPlan, MergeAction, MergeIntoTable, ReplaceData, UpdateAction, UpdateStarAction, UpdateTable}
2323
import org.apache.spark.sql.catalyst.rules.Rule
2424
import org.apache.spark.sql.catalyst.trees.TreePattern.{INSET, NULL_LITERAL, TRUE_OR_FALSE_LITERAL}
2525
import org.apache.spark.sql.types.BooleanType
@@ -54,6 +54,7 @@ object ReplaceNullWithFalseInPredicate extends Rule[LogicalPlan] {
5454
_.containsAnyPattern(NULL_LITERAL, TRUE_OR_FALSE_LITERAL, INSET), ruleId) {
5555
case f @ Filter(cond, _) => f.copy(condition = replaceNullWithFalse(cond))
5656
case j @ Join(_, _, _, Some(cond), _) => j.copy(condition = Some(replaceNullWithFalse(cond)))
57+
case rd @ ReplaceData(_, cond, _, _, _) => rd.copy(condition = replaceNullWithFalse(cond))
5758
case d @ DeleteFromTable(_, cond) => d.copy(condition = replaceNullWithFalse(cond))
5859
case u @ UpdateTable(_, _, Some(cond)) => u.copy(condition = Some(replaceNullWithFalse(cond)))
5960
case m @ MergeIntoTable(_, _, mergeCond, matchedActions, notMatchedActions) =>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyConditionalsInPredicate.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ object SimplifyConditionalsInPredicate extends Rule[LogicalPlan] {
4848
_.containsAnyPattern(CASE_WHEN, IF), ruleId) {
4949
case f @ Filter(cond, _) => f.copy(condition = simplifyConditional(cond))
5050
case j @ Join(_, _, _, Some(cond), _) => j.copy(condition = Some(simplifyConditional(cond)))
51+
case rd @ ReplaceData(_, cond, _, _, _) => rd.copy(condition = simplifyConditional(cond))
5152
case d @ DeleteFromTable(_, cond) => d.copy(condition = simplifyConditional(cond))
5253
case u @ UpdateTable(_, _, Some(cond)) => u.copy(condition = Some(simplifyConditional(cond)))
5354
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,15 @@
1818
package org.apache.spark.sql.catalyst.planning
1919

2020
import org.apache.spark.internal.Logging
21+
import org.apache.spark.sql.AnalysisException
2122
import org.apache.spark.sql.catalyst.expressions._
2223
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
2324
import org.apache.spark.sql.catalyst.optimizer.JoinSelectionHelper
2425
import org.apache.spark.sql.catalyst.plans._
2526
import org.apache.spark.sql.catalyst.plans.logical._
27+
import org.apache.spark.sql.connector.catalog.Table
2628
import org.apache.spark.sql.errors.QueryCompilationErrors
29+
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation}
2730
import org.apache.spark.sql.internal.SQLConf
2831

2932
trait OperationHelper extends AliasHelper with PredicateHelper {
@@ -388,3 +391,51 @@ object ExtractSingleColumnNullAwareAntiJoin extends JoinSelectionHelper with Pre
388391
case _ => None
389392
}
390393
}
394+
395+
/**
396+
* An extractor for row-level commands such as DELETE, UPDATE, MERGE that were rewritten using plans
397+
* that operate on groups of rows.
398+
*
399+
* This class extracts the following entities:
400+
* - the group-based rewrite plan;
401+
* - the condition that defines matching groups;
402+
* - the read relation that can be either [[DataSourceV2Relation]] or [[DataSourceV2ScanRelation]]
403+
* depending on whether the planning has already happened;
404+
*/
405+
object GroupBasedRowLevelOperation {
406+
type ReturnType = (ReplaceData, Expression, LogicalPlan)
407+
408+
def unapply(plan: LogicalPlan): Option[ReturnType] = plan match {
409+
case rd @ ReplaceData(DataSourceV2Relation(table, _, _, _, _), cond, query, _, _) =>
410+
val readRelation = findReadRelation(table, query)
411+
readRelation.map((rd, cond, _))
412+
413+
case _ =>
414+
None
415+
}
416+
417+
private def findReadRelation(
418+
table: Table,
419+
plan: LogicalPlan): Option[LogicalPlan] = {
420+
421+
val readRelations = plan.collect {
422+
case r: DataSourceV2Relation if r.table eq table => r
423+
case r: DataSourceV2ScanRelation if r.relation.table eq table => r
424+
}
425+
426+
// in some cases, the optimizer replaces the v2 read relation with a local relation
427+
// for example, there is no reason to query the table if the condition is always false
428+
// that's why it is valid not to find the corresponding v2 read relation
429+
430+
readRelations match {
431+
case relations if relations.isEmpty =>
432+
None
433+
434+
case Seq(relation) =>
435+
Some(relation)
436+
437+
case relations =>
438+
throw new AnalysisException(s"Expected only one row-level read relation: $relations")
439+
}
440+
}
441+
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala

Lines changed: 85 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717

1818
package org.apache.spark.sql.catalyst.plans.logical
1919

20-
import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, FieldName, NamedRelation, PartitionSpec, ResolvedDBObjectName, UnresolvedException}
20+
import org.apache.spark.sql.{sources, AnalysisException}
21+
import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, EliminateSubqueryAliases, FieldName, NamedRelation, PartitionSpec, ResolvedDBObjectName, UnresolvedException}
2122
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
2223
import org.apache.spark.sql.catalyst.catalog.FunctionResource
2324
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, Expression, Unevaluable}
@@ -26,7 +27,8 @@ import org.apache.spark.sql.catalyst.trees.BinaryLike
2627
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
2728
import org.apache.spark.sql.connector.catalog._
2829
import org.apache.spark.sql.connector.expressions.Transform
29-
import org.apache.spark.sql.connector.write.Write
30+
import org.apache.spark.sql.connector.write.{RowLevelOperation, RowLevelOperationTable, Write}
31+
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
3032
import org.apache.spark.sql.types.{BooleanType, DataType, MetadataBuilder, StringType, StructType}
3133

3234
/**
@@ -176,6 +178,77 @@ object OverwritePartitionsDynamic {
176178
}
177179
}
178180

181+
trait RowLevelWrite extends V2WriteCommand with SupportsSubquery {
182+
def operation: RowLevelOperation
183+
def condition: Expression
184+
def originalTable: NamedRelation
185+
}
186+
187+
/**
188+
* Replace groups of data in an existing table during a row-level operation.
189+
*
190+
* This node is constructed in rules that rewrite DELETE, UPDATE, MERGE operations for data sources
191+
* that can replace groups of data (e.g. files, partitions).
192+
*
193+
* @param table a plan that references a row-level operation table
194+
* @param condition a condition that defines matching groups
195+
* @param query a query with records that should replace the records that were read
196+
* @param originalTable a plan for the original table for which the row-level command was triggered
197+
* @param write a logical write, if already constructed
198+
*/
199+
case class ReplaceData(
200+
table: NamedRelation,
201+
condition: Expression,
202+
query: LogicalPlan,
203+
originalTable: NamedRelation,
204+
write: Option[Write] = None) extends RowLevelWrite {
205+
206+
override lazy val isByName: Boolean = false
207+
override lazy val references: AttributeSet = query.outputSet
208+
override lazy val stringArgs: Iterator[Any] = Iterator(table, query, write)
209+
210+
lazy val operation: RowLevelOperation = {
211+
EliminateSubqueryAliases(table) match {
212+
case DataSourceV2Relation(RowLevelOperationTable(_, operation), _, _, _, _) =>
213+
operation
214+
case _ =>
215+
throw new AnalysisException(s"Cannot retrieve row-level operation from $table")
216+
}
217+
}
218+
219+
// the incoming query may include metadata columns
220+
lazy val dataInput: Seq[Attribute] = {
221+
val tableAttrNames = table.output.map(_.name)
222+
query.output.filter(attr => tableAttrNames.exists(conf.resolver(_, attr.name)))
223+
}
224+
225+
override def outputResolved: Boolean = {
226+
assert(table.resolved && query.resolved,
227+
"`outputResolved` can only be called when `table` and `query` are both resolved.")
228+
229+
// take into account only incoming data columns and ignore metadata columns in the query
230+
// they will be discarded after the logical write is built in the optimizer
231+
// metadata columns may be needed to request a correct distribution or ordering
232+
// but are not passed back to the data source during writes
233+
234+
table.skipSchemaResolution || (dataInput.size == table.output.size &&
235+
dataInput.zip(table.output).forall { case (inAttr, outAttr) =>
236+
val outType = CharVarcharUtils.getRawType(outAttr.metadata).getOrElse(outAttr.dataType)
237+
// names and types must match, nullability must be compatible
238+
inAttr.name == outAttr.name &&
239+
DataType.equalsIgnoreCompatibleNullability(inAttr.dataType, outType) &&
240+
(outAttr.nullable || !inAttr.nullable)
241+
})
242+
}
243+
244+
override def withNewQuery(newQuery: LogicalPlan): ReplaceData = copy(query = newQuery)
245+
246+
override def withNewTable(newTable: NamedRelation): ReplaceData = copy(table = newTable)
247+
248+
override protected def withNewChildInternal(newChild: LogicalPlan): ReplaceData = {
249+
copy(query = newChild)
250+
}
251+
}
179252

180253
/** A trait used for logical plan nodes that create or replace V2 table definitions. */
181254
trait V2CreateTablePlan extends LogicalPlan {
@@ -457,6 +530,16 @@ case class DeleteFromTable(
457530
copy(table = newChild)
458531
}
459532

533+
/**
534+
* The logical plan of the DELETE FROM command that can be executed using data source filters.
535+
*
536+
* As opposed to [[DeleteFromTable]], this node represents a DELETE operation where the condition
537+
* was converted into filters and the data source reported that it can handle all of them.
538+
*/
539+
case class DeleteFromTableWithFilters(
540+
table: LogicalPlan,
541+
condition: Seq[sources.Filter]) extends LeafCommand
542+
460543
/**
461544
* The logical plan of the UPDATE TABLE command.
462545
*/

0 commit comments

Comments
 (0)