Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ class Analyzer(override val catalogManager: CatalogManager)
ResolveRandomSeed ::
ResolveBinaryArithmetic ::
ResolveUnion ::
RewriteDeleteFromTable ::
typeCoercionRules ++
Seq(ResolveWithCTE) ++
extendedResolutionRules : _*),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.catalyst.expressions.{EqualNullSafe, Expression, Not}
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, Filter, LogicalPlan, ReplaceData}
import org.apache.spark.sql.connector.catalog.{SupportsDelete, SupportsRowLevelOperations, TruncatableTable}
import org.apache.spark.sql.connector.write.RowLevelOperation.Command.DELETE
import org.apache.spark.sql.connector.write.RowLevelOperationTable
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.util.CaseInsensitiveStringMap

/**
* A rule that rewrites DELETE operations using plans that operate on individual or groups of rows.
*
* If a table implements [[SupportsDelete]] and [[SupportsRowLevelOperations]], this rule will
* still rewrite the DELETE operation but the optimizer will check whether this particular DELETE
* statement can be handled by simply passing delete filters to the connector. If so, the optimizer
* will discard the rewritten plan and will allow the data source to delete using filters.
*/
object RewriteDeleteFromTable extends RewriteRowLevelCommand {

override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case d @ DeleteFromTable(aliasedTable, cond) if d.resolved =>
EliminateSubqueryAliases(aliasedTable) match {
case DataSourceV2Relation(_: TruncatableTable, _, _, _, _) if cond == TrueLiteral =>
// don't rewrite as the table supports truncation
d

case r @ DataSourceV2Relation(t: SupportsRowLevelOperations, _, _, _, _) =>
val table = buildOperationTable(t, DELETE, CaseInsensitiveStringMap.empty())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we pass options from the V2 relation instead of just using empty?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure. These are options passed into newRowLevelOperationBuilder and I thought they should come from the SQL operation. For example, if Spark adds a clause OPTIONS to its SQL for DELETE, UPDATE, MERGE, then these values will be propagated here.

buildReplaceDataPlan(r, table, cond)

case DataSourceV2Relation(_: SupportsDelete, _, _, _, _) =>
// don't rewrite as the table supports deletes only with filters
d

case DataSourceV2Relation(t, _, _, _, _) =>
throw QueryCompilationErrors.tableDoesNotSupportDeletesError(t)

case _ =>
d
}
}

// build a rewrite plan for sources that support replacing groups of data (e.g. files, partitions)
private def buildReplaceDataPlan(
relation: DataSourceV2Relation,
operationTable: RowLevelOperationTable,
cond: Expression): ReplaceData = {

// resolve all required metadata attrs that may be used for grouping data on write
// for instance, JDBC data source may cluster data by shard/host before writing
val metadataAttrs = resolveRequiredMetadataAttrs(relation, operationTable.operation)
Copy link
Contributor

@cloud-fan cloud-fan Mar 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little confused here. Why do we need to include metadata attributes to build the "main scan" which is the input of the write operation? According to our previous discussion, metadata attributes should be used to build the scan for collecting the affected groups, e.g. the _file_name metadata column,

Copy link
Contributor Author

@aokolnychyi aokolnychyi Mar 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Metadata columns may be used for multiple purposes:

  • grouping data on write
    Data sources may reference metadata columns in RequiresDistributionAndOrdering in the main write. In order to support this, such metadata columns must be included in the main scan. The comment above mentions potentially clustering data by shard/host before writing in JDBC data sources (just an example).

  • runtime filtering
    Data sources may reference metadata columns in SupportsRuntimeFiltering in the main scan. The idea we discussed earlier is that we build the main scan, it exposes _file_name as it is runtime filtering attributes, an optimizer rule assigns a filter subquery that uses another scan builder (i.e. for runtime filtering), Spark executes the subquery, collects unique values for _file_name and passes the results back to the main scan as an IN filter on _file_name.

In both cases, the metadata columns are projected away once the write is built and are never passed to the writer.


// construct a read relation and include all required metadata columns
val readRelation = buildRelationWithAttrs(relation, operationTable, metadataAttrs)

// construct a plan that contains unmatched rows in matched groups that must be carried over
// such rows do not match the condition but have to be copied over as the source can replace
// only groups of rows (e.g. if a source supports replacing files, unmatched rows in matched
// files must be carried over)
// it is safe to negate the condition here as the predicate pushdown for group-based row-level
// operations is handled in a special way
val remainingRowsFilter = Not(EqualNullSafe(cond, TrueLiteral))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious why we use EqualNullSafe here. What does it mean when cond evaluates to null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is necessary to avoid cases when null = true evaluates to null and then gets further propagated.
In this case, null = true should evaluate to false.

val remainingRowsPlan = Filter(remainingRowsFilter, readRelation)

// build a plan to replace read groups in the table
val writeRelation = relation.copy(table = operationTable)
ReplaceData(writeRelation, cond, remainingRowsPlan, relation)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.analysis

import scala.collection.mutable

import org.apache.spark.sql.catalyst.expressions.{AttributeReference, ExprId, V2ExpressionUtils}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.SupportsRowLevelOperations
import org.apache.spark.sql.connector.write.{RowLevelOperation, RowLevelOperationInfoImpl, RowLevelOperationTable}
import org.apache.spark.sql.connector.write.RowLevelOperation.Command
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.util.CaseInsensitiveStringMap

trait RewriteRowLevelCommand extends Rule[LogicalPlan] {

protected def buildOperationTable(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed the method and added options. I did not default it to an empty map to stay on one line.

table: SupportsRowLevelOperations,
command: Command,
options: CaseInsensitiveStringMap): RowLevelOperationTable = {
val info = RowLevelOperationInfoImpl(command, options)
val operation = table.newRowLevelOperationBuilder(info).build()
RowLevelOperationTable(table, operation)
}

protected def buildRelationWithAttrs(
relation: DataSourceV2Relation,
table: RowLevelOperationTable,
metadataAttrs: Seq[AttributeReference]): DataSourceV2Relation = {

val attrs = dedupAttrs(relation.output ++ metadataAttrs)
relation.copy(table = table, output = attrs)
}

protected def dedupAttrs(attrs: Seq[AttributeReference]): Seq[AttributeReference] = {
val exprIds = mutable.Set.empty[ExprId]
attrs.flatMap { attr =>
if (exprIds.contains(attr.exprId)) {
None
} else {
exprIds += attr.exprId
Some(attr)
}
}
}

protected def resolveRequiredMetadataAttrs(
relation: DataSourceV2Relation,
operation: RowLevelOperation): Seq[AttributeReference] = {

V2ExpressionUtils.resolveRefs[AttributeReference](
operation.requiredMetadataAttributes,
relation)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.expressions.{And, ArrayExists, ArrayFilter, CaseWhen, EqualNullSafe, Expression, If, In, InSet, LambdaFunction, Literal, MapFilter, Not, Or}
import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral}
import org.apache.spark.sql.catalyst.plans.logical.{DeleteAction, DeleteFromTable, Filter, InsertAction, InsertStarAction, Join, LogicalPlan, MergeAction, MergeIntoTable, UpdateAction, UpdateStarAction, UpdateTable}
import org.apache.spark.sql.catalyst.plans.logical.{DeleteAction, DeleteFromTable, Filter, InsertAction, InsertStarAction, Join, LogicalPlan, MergeAction, MergeIntoTable, ReplaceData, UpdateAction, UpdateStarAction, UpdateTable}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreePattern.{INSET, NULL_LITERAL, TRUE_OR_FALSE_LITERAL}
import org.apache.spark.sql.types.BooleanType
Expand Down Expand Up @@ -54,6 +54,7 @@ object ReplaceNullWithFalseInPredicate extends Rule[LogicalPlan] {
_.containsAnyPattern(NULL_LITERAL, TRUE_OR_FALSE_LITERAL, INSET), ruleId) {
case f @ Filter(cond, _) => f.copy(condition = replaceNullWithFalse(cond))
case j @ Join(_, _, _, Some(cond), _) => j.copy(condition = Some(replaceNullWithFalse(cond)))
case rd @ ReplaceData(_, cond, _, _, _) => rd.copy(condition = replaceNullWithFalse(cond))
case d @ DeleteFromTable(_, cond) => d.copy(condition = replaceNullWithFalse(cond))
case u @ UpdateTable(_, _, Some(cond)) => u.copy(condition = Some(replaceNullWithFalse(cond)))
case m @ MergeIntoTable(_, _, mergeCond, matchedActions, notMatchedActions) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ object SimplifyConditionalsInPredicate extends Rule[LogicalPlan] {
_.containsAnyPattern(CASE_WHEN, IF), ruleId) {
case f @ Filter(cond, _) => f.copy(condition = simplifyConditional(cond))
case j @ Join(_, _, _, Some(cond), _) => j.copy(condition = Some(simplifyConditional(cond)))
case rd @ ReplaceData(_, cond, _, _, _) => rd.copy(condition = simplifyConditional(cond))
case d @ DeleteFromTable(_, cond) => d.copy(condition = simplifyConditional(cond))
case u @ UpdateTable(_, _, Some(cond)) => u.copy(condition = Some(simplifyConditional(cond)))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@
package org.apache.spark.sql.catalyst.planning

import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.optimizer.JoinSelectionHelper
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation}
import org.apache.spark.sql.internal.SQLConf

trait OperationHelper extends AliasHelper with PredicateHelper {
Expand Down Expand Up @@ -388,3 +391,51 @@ object ExtractSingleColumnNullAwareAntiJoin extends JoinSelectionHelper with Pre
case _ => None
}
}

/**
* An extractor for row-level commands such as DELETE, UPDATE, MERGE that were rewritten using plans
* that operate on groups of rows.
*
* This class extracts the following entities:
* - the group-based rewrite plan;
* - the condition that defines matching groups;
* - the read relation that can be either [[DataSourceV2Relation]] or [[DataSourceV2ScanRelation]]
* depending on whether the planning has already happened;
*/
object GroupBasedRowLevelOperation {
type ReturnType = (ReplaceData, Expression, LogicalPlan)

def unapply(plan: LogicalPlan): Option[ReturnType] = plan match {
case rd @ ReplaceData(DataSourceV2Relation(table, _, _, _, _), cond, query, _, _) =>
val readRelation = findReadRelation(table, query)
readRelation.map((rd, cond, _))

case _ =>
None
}

private def findReadRelation(
table: Table,
plan: LogicalPlan): Option[LogicalPlan] = {

val readRelations = plan.collect {
case r: DataSourceV2Relation if r.table eq table => r
case r: DataSourceV2ScanRelation if r.relation.table eq table => r
}

// in some cases, the optimizer replaces the v2 read relation with a local relation
// for example, there is no reason to query the table if the condition is always false
// that's why it is valid not to find the corresponding v2 read relation

readRelations match {
case relations if relations.isEmpty =>
None

case Seq(relation) =>
Some(relation)

case relations =>
throw new AnalysisException(s"Expected only one row-level read relation: $relations")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,18 @@

package org.apache.spark.sql.catalyst.plans.logical

import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, FieldName, NamedRelation, PartitionSpec, ResolvedDBObjectName, UnresolvedException}
import org.apache.spark.sql.{sources, AnalysisException}
import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, EliminateSubqueryAliases, FieldName, NamedRelation, PartitionSpec, ResolvedDBObjectName, UnresolvedException}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.catalog.FunctionResource
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, Expression, Unevaluable}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, Expression, MetadataAttribute, Unevaluable}
import org.apache.spark.sql.catalyst.plans.DescribeCommandSchema
import org.apache.spark.sql.catalyst.trees.BinaryLike
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.write.Write
import org.apache.spark.sql.connector.write.{RowLevelOperation, RowLevelOperationTable, Write}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.types.{BooleanType, DataType, MetadataBuilder, StringType, StructType}

/**
Expand Down Expand Up @@ -176,6 +178,80 @@ object OverwritePartitionsDynamic {
}
}

trait RowLevelWrite extends V2WriteCommand with SupportsSubquery {
def operation: RowLevelOperation
def condition: Expression
def originalTable: NamedRelation
}

/**
* Replace groups of data in an existing table during a row-level operation.
*
* This node is constructed in rules that rewrite DELETE, UPDATE, MERGE operations for data sources
* that can replace groups of data (e.g. files, partitions).
*
* @param table a plan that references a row-level operation table
* @param condition a condition that defines matching groups
* @param query a query with records that should replace the records that were read
* @param originalTable a plan for the original table for which the row-level command was triggered
* @param write a logical write, if already constructed
*/
case class ReplaceData(
table: NamedRelation,
condition: Expression,
query: LogicalPlan,
originalTable: NamedRelation,
write: Option[Write] = None) extends RowLevelWrite {

override val isByName: Boolean = false
override val stringArgs: Iterator[Any] = Iterator(table, query, write)

override lazy val references: AttributeSet = query.outputSet

lazy val operation: RowLevelOperation = {
EliminateSubqueryAliases(table) match {
case DataSourceV2Relation(RowLevelOperationTable(_, operation), _, _, _, _) =>
operation
case _ =>
throw new AnalysisException(s"Cannot retrieve row-level operation from $table")
}
}

// the incoming query may include metadata columns
lazy val dataInput: Seq[Attribute] = {
query.output.filter {
case MetadataAttribute(_) => false
case _ => true
}
}

override def outputResolved: Boolean = {
assert(table.resolved && query.resolved,
"`outputResolved` can only be called when `table` and `query` are both resolved.")

// take into account only incoming data columns and ignore metadata columns in the query
// they will be discarded after the logical write is built in the optimizer
// metadata columns may be needed to request a correct distribution or ordering
// but are not passed back to the data source during writes

table.skipSchemaResolution || (dataInput.size == table.output.size &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really need to check this? the input query is built by spark and is directly reading the table.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be redundant in case of DELETE but it will be required for UPDATE and MERGE when the incoming values no longer solely depend on what was read. This will prevent setting nullable values for non-nullable attributes, for instance.

dataInput.zip(table.output).forall { case (inAttr, outAttr) =>
val outType = CharVarcharUtils.getRawType(outAttr.metadata).getOrElse(outAttr.dataType)
// names and types must match, nullability must be compatible
inAttr.name == outAttr.name &&
DataType.equalsIgnoreCompatibleNullability(inAttr.dataType, outType) &&
(outAttr.nullable || !inAttr.nullable)
})
}

override def withNewQuery(newQuery: LogicalPlan): ReplaceData = copy(query = newQuery)

override def withNewTable(newTable: NamedRelation): ReplaceData = copy(table = newTable)

override protected def withNewChildInternal(newChild: LogicalPlan): ReplaceData = {
copy(query = newChild)
}
}

/** A trait used for logical plan nodes that create or replace V2 table definitions. */
trait V2CreateTablePlan extends LogicalPlan {
Expand Down Expand Up @@ -457,6 +533,16 @@ case class DeleteFromTable(
copy(table = newChild)
}

/**
* The logical plan of the DELETE FROM command that can be executed using data source filters.
*
* As opposed to [[DeleteFromTable]], this node represents a DELETE operation where the condition
* was converted into filters and the data source reported that it can handle all of them.
*/
case class DeleteFromTableWithFilters(
table: LogicalPlan,
condition: Seq[sources.Filter]) extends LeafCommand
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The import looks a bit weird. I can do an aliased import if that's any better.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would probably move DeleteFromTableWithFilters to a follow-up commit since it is an optimization and not needed for correctness.

Copy link
Contributor

@cloud-fan cloud-fan Feb 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, Spark can already plan filter-based DELETE today, so not supporting it would be a regression.

Copy link
Contributor Author

@aokolnychyi aokolnychyi Feb 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan, DeleteFromTableWithFilters is an optimization for SupportsRowLevelOperations. Existing deletes with filters would be unaffected. That being said, I am going to combine the existing logic in DataSourceV2Strategy with the optimizer rule I added, like discussed here. That way, we will have the filter conversion logic just in one place. Let me know if you agree with that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1


/**
* The logical plan of the UPDATE TABLE command.
*/
Expand Down
Loading