Skip to content

Commit 440b42a

Browse files
committed
[SPARK-40193][SQL] Merge subquery plans with different filters
1 parent 37236f8 commit 440b42a

File tree

6 files changed

+664
-83
lines changed

6 files changed

+664
-83
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4303,6 +4303,15 @@ object SQLConf {
43034303
.booleanConf
43044304
.createWithDefault(true)
43054305

4306+
val PLAN_MERGE_IGNORE_PUSHED_PUSHED_DATA_FILTERS =
4307+
buildConf("spark.sql.planMerge.ignorePushedDataFilters")
4308+
.internal()
4309+
.doc(s"When set to true plan merging is enabled even if physical scan operations have " +
4310+
"different data filters pushed down.")
4311+
.version("4.0.0")
4312+
.booleanConf
4313+
.createWithDefault(true)
4314+
43064315
val ERROR_MESSAGE_FORMAT = buildConf("spark.sql.error.messageFormat")
43074316
.doc("When PRETTY, the error message consists of textual representation of error class, " +
43084317
"message and query context. The MINIMAL and STANDARD formats are pretty JSON formats where " +

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,6 @@ class SparkOptimizer(
5858
new RowLevelOperationRuntimeGroupFiltering(OptimizeSubqueries)) :+
5959
Batch("InjectRuntimeFilter", FixedPoint(1),
6060
InjectRuntimeFilter) :+
61-
Batch("MergeScalarSubqueries", Once,
62-
MergeScalarSubqueries,
63-
RewriteDistinctAggregates) :+
6461
Batch("Pushdown Filters from PartitionPruning", fixedPoint,
6562
PushDownPredicates) :+
6663
Batch("Cleanup filters that cannot be pushed down", Once,
@@ -92,6 +89,9 @@ class SparkOptimizer(
9289
LimitPushDownThroughWindow,
9390
EliminateLimits) :+
9491
Batch("User Provided Optimizers", fixedPoint, experimentalMethods.extraOptimizations: _*) :+
92+
Batch("Merge Scalar Subqueries", Once,
93+
MergeScalarSubqueries,
94+
RewriteDistinctAggregates) :+
9595
Batch("Replace CTE with Repartition", Once, ReplaceCTERefWithRepartition)
9696

9797
override def nonExcludableRules: Seq[String] = super.nonExcludableRules :+

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,14 @@ import org.apache.spark.util.collection.BitSet
5656
* is under the threshold with the addition of the next file, add it. If not, open a new bucket
5757
* and add it. Proceed to the next file.
5858
*/
59-
object FileSourceStrategy extends Strategy with PredicateHelper with Logging {
59+
object FileSourceStrategy extends Strategy {
60+
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
61+
case FileSourceScanPlan(scanPlan, _) => scanPlan :: Nil
62+
case _ => Nil
63+
}
64+
}
6065

66+
object FileSourceScanPlan extends PredicateHelper with Logging {
6167
// should prune buckets iff num buckets is greater than 1 and there is only one bucket column
6268
private def shouldPruneBuckets(bucketSpec: Option[BucketSpec]): Boolean = {
6369
bucketSpec match {
@@ -147,7 +153,7 @@ object FileSourceStrategy extends Strategy with PredicateHelper with Logging {
147153
}
148154
}
149155

150-
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
156+
def unapply(plan: LogicalPlan): Option[(SparkPlan, FileSourceScanExec)] = plan match {
151157
case ScanOperation(projects, stayUpFilters, filters,
152158
l @ LogicalRelation(fsRelation: HadoopFsRelation, _, table, _)) =>
153159
// Filters on this relation fall into four categories based on where we can use them to avoid
@@ -350,8 +356,8 @@ object FileSourceStrategy extends Strategy with PredicateHelper with Logging {
350356
execution.ProjectExec(projects, withFilter)
351357
}
352358

353-
withProjections :: Nil
359+
Some(withProjections, scan)
354360

355-
case _ => Nil
361+
case _ => None
356362
}
357363
}

0 commit comments

Comments
 (0)