Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.spark.sql.connector.expressions.NamedReference;
import org.apache.spark.sql.connector.read.Scan;
import org.apache.spark.sql.connector.read.ScanBuilder;
import org.apache.spark.sql.connector.read.SupportsRuntimeV2Filtering;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

/**
Expand Down Expand Up @@ -68,6 +69,19 @@ default String description() {
* be returned by the scan, even if a filter can narrow the set of changes to a single file
* in the partition. Similarly, a data source that can swap individual files must produce all
* rows from files where at least one record must be changed, not just rows that must be changed.
* <p>
* Data sources that replace groups of data (e.g. files, partitions) may prune entire groups
* using provided data source filters when building a scan for this row-level operation.
* However, such data skipping is limited as not all expressions can be converted into data source
* filters and some can only be evaluated by Spark (e.g. subqueries). Since rewriting groups is
* expensive, Spark allows group-based data sources to filter groups at runtime. The runtime
* filtering enables data sources to narrow down the scope of rewriting to only groups that must
* be rewritten. If the row-level operation scan implements {@link SupportsRuntimeV2Filtering},
* Spark will execute a query at runtime to find which records match the row-level condition.
* The runtime group filter subquery will leverage a regular batch scan, which isn't required to
* produce all rows in a group if any are returned. The information about matching records will
* be passed back into the row-level operation scan, allowing data sources to discard groups
* that don't have to be rewritten.
*/
ScanBuilder newScanBuilder(CaseInsensitiveStringMap options);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,21 @@ object SQLConf {
.longConf
.createWithDefault(67108864L)

val RUNTIME_ROW_LEVEL_OPERATION_GROUP_FILTER_ENABLED =
buildConf("spark.sql.optimizer.runtime.rowLevelOperationGroupFilter.enabled")
Copy link
Contributor Author

@aokolnychyi aokolnychyi Sep 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went back and forth on the name. On one hand, we have dynamic partition pruning. On the other hand, we call it runtime filtering in DS V2. Ideas are welcome.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also used the spark.sql.optimizer.runtime prefix like for runtime Bloom filter joins. There are other runtime-related configs that don't use this prefix so let me know the correct config namespace.

.doc("Enables runtime group filtering for group-based row-level operations. " +
"Data sources that replace groups of data (e.g. files, partitions) may prune entire " +
"groups using provided data source filters when planning a row-level operation scan. " +
"However, such filtering is limited as not all expressions can be converted into data " +
"source filters and some expressions can only be evaluated by Spark (e.g. subqueries). " +
"Since rewriting groups is expensive, Spark can execute a query at runtime to find what " +
"records match the condition of the row-level operation. The information about matching " +
"records will be passed back to the row-level operation scan, allowing data sources to " +
"discard groups that don't have to be rewritten.")
.version("3.4.0")
.booleanConf
.createWithDefault(true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with starting with true in this case.


val PLANNED_WRITE_ENABLED = buildConf("spark.sql.optimizer.plannedWrite.enabled")
.internal()
.doc("When set to true, Spark optimizer will add logical sort operators to V1 write commands " +
Expand Down Expand Up @@ -4084,6 +4099,9 @@ class SQLConf extends Serializable with Logging {
def runtimeFilterCreationSideThreshold: Long =
getConf(RUNTIME_BLOOM_FILTER_CREATION_SIDE_THRESHOLD)

def runtimeRowLevelOperationGroupFilterEnabled: Boolean =
getConf(RUNTIME_ROW_LEVEL_OPERATION_GROUP_FILTER_ENABLED)

def stateStoreProviderClass: String = getConf(STATE_STORE_PROVIDER_CLASS)

def isStateSchemaCheckEnabled: Boolean = getConf(STATE_SCHEMA_CHECK_ENABLED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ class InMemoryRowLevelOperationTable(
properties: util.Map[String, String])
extends InMemoryTable(name, schema, partitioning, properties) with SupportsRowLevelOperations {

// used in row-level operation tests to verify replaced partitions
var replacedPartitions: Seq[Seq[Any]] = Seq.empty
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a comment to mention this is for test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment above.


override def newRowLevelOperationBuilder(
info: RowLevelOperationInfo): RowLevelOperationBuilder = {
() => PartitionBasedOperation(info.command)
Expand Down Expand Up @@ -88,8 +91,9 @@ class InMemoryRowLevelOperationTable(
override def commit(messages: Array[WriterCommitMessage]): Unit = dataMap.synchronized {
val newData = messages.map(_.asInstanceOf[BufferedRows])
val readRows = scan.data.flatMap(_.asInstanceOf[BufferedRows].rows)
val readPartitions = readRows.map(r => getKey(r, schema))
val readPartitions = readRows.map(r => getKey(r, schema)).distinct
dataMap --= readPartitions
replacedPartitions = readPartitions
withData(newData, schema)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.execution.datasources.{PruneFileSourcePartitions, SchemaPruning, V1Writes}
import org.apache.spark.sql.execution.datasources.v2.{GroupBasedRowLevelOperationScanPlanning, OptimizeMetadataOnlyDeleteFromTable, V2ScanPartitioningAndOrdering, V2ScanRelationPushDown, V2Writes}
import org.apache.spark.sql.execution.dynamicpruning.{CleanupDynamicPruningFilters, PartitionPruning}
import org.apache.spark.sql.execution.dynamicpruning.{CleanupDynamicPruningFilters, PartitionPruning, RowLevelOperationRuntimeGroupFiltering}
import org.apache.spark.sql.execution.python.{ExtractGroupingPythonUDFFromAggregate, ExtractPythonUDFFromAggregate, ExtractPythonUDFs}

class SparkOptimizer(
Expand All @@ -50,7 +50,8 @@ class SparkOptimizer(
override def defaultBatches: Seq[Batch] = (preOptimizationBatches ++ super.defaultBatches :+
Batch("Optimize Metadata Only Query", Once, OptimizeMetadataOnlyQuery(catalog)) :+
Batch("PartitionPruning", Once,
PartitionPruning) :+
PartitionPruning,
RowLevelOperationRuntimeGroupFiltering(OptimizeSubqueries)) :+
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think another idea is to run OptimizeSubqueries in this batch:

PartitionPruning,
RowLevelOperationRuntimeGroupFiltering,
// The rule above may create subqueries, need to optimize them.
OptimizeSubqueries

Copy link
Contributor Author

@aokolnychyi aokolnychyi Nov 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be much cleaner but SPARK-36444 removed OptimizeSubqueries from that batch.

Batch("InjectRuntimeFilter", FixedPoint(1),
InjectRuntimeFilter) :+
Batch("MergeScalarSubqueries", Once,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, HashedRelati
case class PlanAdaptiveDynamicPruningFilters(
rootPlan: AdaptiveSparkPlanExec) extends Rule[SparkPlan] with AdaptiveSparkPlanHelper {
def apply(plan: SparkPlan): SparkPlan = {
if (!conf.dynamicPartitionPruningEnabled) {
if (!conf.dynamicPartitionPruningEnabled && !conf.runtimeRowLevelOperationGroupFilterEnabled) {
return plan
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ case class PlanDynamicPruningFilters(sparkSession: SparkSession) extends Rule[Sp
}

override def apply(plan: SparkPlan): SparkPlan = {
if (!conf.dynamicPartitionPruningEnabled) {
if (!conf.dynamicPartitionPruningEnabled && !conf.runtimeRowLevelOperationGroupFilterEnabled) {
return plan
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.dynamicpruning

import org.apache.spark.sql.catalyst.expressions.{And, Attribute, DynamicPruningSubquery, Expression, PredicateHelper, V2ExpressionUtils}
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
import org.apache.spark.sql.catalyst.planning.GroupBasedRowLevelOperation
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.read.SupportsRuntimeV2Filtering
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Implicits, DataSourceV2Relation, DataSourceV2ScanRelation}

/**
* A rule that assigns a subquery to filter groups in row-level operations at runtime.
*
* Data skipping during job planning for row-level operations is limited to expressions that can be
* converted to data source filters. Since not all expressions can be pushed down that way and
* rewriting groups is expensive, Spark allows data sources to filter group at runtime.
* If the primary scan in a group-based row-level operation supports runtime filtering, this rule
* will inject a subquery to find all rows that match the condition so that data sources know
* exactly which groups must be rewritten.
*
* Note this rule only applies to group-based row-level operations.
*/
case class RowLevelOperationRuntimeGroupFiltering(optimizeSubqueries: Rule[LogicalPlan])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to pass the rule as a parameter? Can't we call OptimizeSubqueries directly in this rule?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also thought about this, but I think it's hard to reference OptimizeSubqueries outside Optimizer, since the former is more like a "inner class" of the latter, and references the current instance of latter in itself (i.e.: Optimizer.this.execute(Subquery.fromExpression(s)))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sunchao is correct. It wasn't easy to call OptimizeSubqueries outside Optimizer. Hence, I had to come up with this workaround.

@cloud-fan, I also considered simply adding OptimizeSubqueries to the batch with runtime partition filtering. However, SPARK-36444 specifically removed it from there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An alternative idea could be to move OptimizeSubqueries into its own file. However, that's tricky too as it calls the optimizer.

Optimizer.this.execute(Subquery.fromExpression(s))

extends Rule[LogicalPlan] with PredicateHelper {

import DataSourceV2Implicits._

override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
// apply special dynamic filtering only for group-based row-level operations
case GroupBasedRowLevelOperation(replaceData, cond,
DataSourceV2ScanRelation(_, scan: SupportsRuntimeV2Filtering, _, _, _))
Copy link
Contributor Author

@aokolnychyi aokolnychyi Sep 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the optimizer rule that checks whether the primary row-level scan supports runtime filtering. As long as a data source implements SupportsRuntimeV2Filtering, it should be enough to benefit from the new functionality.

Also, the runtime group filter uses the existing framework for runtime filtering in DS V2, meaning we get all the benefits like subquery reuse, etc.

if conf.runtimeRowLevelOperationGroupFilterEnabled && cond != TrueLiteral =>

// use reference equality on scan to find required scan relations
val newQuery = replaceData.query transformUp {
case r: DataSourceV2ScanRelation if r.scan eq scan =>
// use the original table instance that was loaded for this row-level operation
// in order to leverage a regular batch scan in the group filter query
val originalTable = r.relation.table.asRowLevelOperationTable.table
val relation = r.relation.copy(table = originalTable)
Copy link
Contributor Author

@aokolnychyi aokolnychyi Sep 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We build DataSourceV2Relation so the scan can prune columns and push filters into groups.

val matchingRowsPlan = buildMatchingRowsPlan(relation, cond)

val filterAttrs = scan.filterAttributes
val buildKeys = V2ExpressionUtils.resolveRefs[Attribute](filterAttrs, matchingRowsPlan)
val pruningKeys = V2ExpressionUtils.resolveRefs[Attribute](filterAttrs, r)
val dynamicPruningCond = buildDynamicPruningCond(matchingRowsPlan, buildKeys, pruningKeys)

Filter(dynamicPruningCond, r)
}

// optimize subqueries to rewrite them as joins and trigger job planning
replaceData.copy(query = optimizeSubqueries(newQuery))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean we revert what we did in RewriteDeleteFromTable before?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really, @cloud-fan. This rule simply attaches a runtime filter to the plan that was created while rewriting the delete. We do replace the query but it is pretty much the same plan, just with an extra runtime filter.

}

private def buildMatchingRowsPlan(
relation: DataSourceV2Relation,
cond: Expression): LogicalPlan = {

val matchingRowsPlan = Filter(cond, relation)

// clone the relation and assign new expr IDs to avoid conflicts
matchingRowsPlan transformUpWithNewOutput {
case r: DataSourceV2Relation if r eq relation =>
val oldOutput = r.output
val newOutput = oldOutput.map(_.newInstance())
r.copy(output = newOutput) -> oldOutput.zip(newOutput)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

val newRelation = r.newInstance
newRelation -> r.output.zip(newRelation.output)

}
}

private def buildDynamicPruningCond(
matchingRowsPlan: LogicalPlan,
buildKeys: Seq[Attribute],
pruningKeys: Seq[Attribute]): Expression = {

val buildQuery = Project(buildKeys, matchingRowsPlan)
val dynamicPruningSubqueries = pruningKeys.zipWithIndex.map { case (key, index) =>
DynamicPruningSubquery(key, buildQuery, buildKeys, index, onlyInBroadcast = false)
}
dynamicPruningSubqueries.reduce(And)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.Collections
import org.scalatest.BeforeAndAfter

import org.apache.spark.sql.{AnalysisException, DataFrame, Encoders, QueryTest, Row}
import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryRowLevelOperationTableCatalog}
import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryRowLevelOperationTable, InMemoryRowLevelOperationTableCatalog}
import org.apache.spark.sql.connector.expressions.LogicalExpressions._
import org.apache.spark.sql.execution.{QueryExecution, SparkPlan}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
Expand All @@ -46,15 +46,19 @@ abstract class DeleteFromTableSuiteBase
spark.sessionState.conf.unsetConf("spark.sql.catalog.cat")
}

private val namespace = Array("ns1")
private val ident = Identifier.of(namespace, "test_table")
private val tableNameAsString = "cat." + ident.toString
protected val namespace: Array[String] = Array("ns1")
protected val ident: Identifier = Identifier.of(namespace, "test_table")
protected val tableNameAsString: String = "cat." + ident.toString

private def catalog: InMemoryRowLevelOperationTableCatalog = {
protected def catalog: InMemoryRowLevelOperationTableCatalog = {
val catalog = spark.sessionState.catalogManager.catalog("cat")
catalog.asTableCatalog.asInstanceOf[InMemoryRowLevelOperationTableCatalog]
}

protected def table: InMemoryRowLevelOperationTable = {
catalog.loadTable(ident).asInstanceOf[InMemoryRowLevelOperationTable]
}

test("EXPLAIN only delete") {
createAndInitTable("id INT, dep STRING", """{ "id": 1, "dep": "hr" }""")

Expand Down Expand Up @@ -553,13 +557,13 @@ abstract class DeleteFromTableSuiteBase
}
}

private def createTable(schemaString: String): Unit = {
protected def createTable(schemaString: String): Unit = {
val schema = StructType.fromDDL(schemaString)
val tableProps = Collections.emptyMap[String, String]
catalog.createTable(ident, schema, Array(identity(reference(Seq("dep")))), tableProps)
}

private def createAndInitTable(schemaString: String, jsonData: String): Unit = {
protected def createAndInitTable(schemaString: String, jsonData: String): Unit = {
createTable(schemaString)
append(schemaString, jsonData)
}
Expand Down Expand Up @@ -606,7 +610,7 @@ abstract class DeleteFromTableSuiteBase
}

// executes an operation and keeps the executed plan
private def executeAndKeepPlan(func: => Unit): SparkPlan = {
protected def executeAndKeepPlan(func: => Unit): SparkPlan = {
var executedPlan: SparkPlan = null

val listener = new QueryExecutionListener {
Expand All @@ -625,5 +629,3 @@ abstract class DeleteFromTableSuiteBase
stripAQEPlan(executedPlan)
}
}

class GroupBasedDeleteFromTableSuite extends DeleteFromTableSuiteBase
Loading