Skip to content

Commit 632784d

Browse files
committed
[SPARK-38959][SQL][FOLLOWUP] Do not optimize subqueries twice
### What changes were proposed in this pull request? This is a followup of #38557 . We found that some optimizer rules can't be applied twice (those in the `Once` batch), but running the rule `OptimizeSubqueries` twice breaks it as it optimizes subqueries twice. This PR partially reverts #38557 to still invoke `OptimizeSubqueries` in `RowLevelOperationRuntimeGroupFiltering`. We don't fully revert #38557 because it's still beneficial to use IN subquery directly instead of using DPP framework as there is no join. ### Why are the changes needed? Fix the optimizer. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? N/A Closes #38626 from cloud-fan/follow. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent e873871 commit 632784d

File tree

4 files changed

+10
-6
lines changed

4 files changed

+10
-6
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,10 @@ class SparkOptimizer(
5151
Batch("Optimize Metadata Only Query", Once, OptimizeMetadataOnlyQuery(catalog)) :+
5252
Batch("PartitionPruning", Once,
5353
PartitionPruning,
54-
RowLevelOperationRuntimeGroupFiltering,
55-
OptimizeSubqueries) :+
54+
// We can't run `OptimizeSubqueries` in this batch, as it will optimize the subqueries
55+
// twice which may break some optimizer rules that can only be applied once. The rule below
56+
// only invokes `OptimizeSubqueries` to optimize newly added subqueries.
57+
new RowLevelOperationRuntimeGroupFiltering(OptimizeSubqueries)) :+
5658
Batch("InjectRuntimeFilter", FixedPoint(1),
5759
InjectRuntimeFilter) :+
5860
Batch("MergeScalarSubqueries", Once,

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, HashedRelati
3232
case class PlanAdaptiveDynamicPruningFilters(
3333
rootPlan: AdaptiveSparkPlanExec) extends Rule[SparkPlan] with AdaptiveSparkPlanHelper {
3434
def apply(plan: SparkPlan): SparkPlan = {
35-
if (!conf.dynamicPartitionPruningEnabled && !conf.runtimeRowLevelOperationGroupFilterEnabled) {
35+
if (!conf.dynamicPartitionPruningEnabled) {
3636
return plan
3737
}
3838

sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ case class PlanDynamicPruningFilters(sparkSession: SparkSession) extends Rule[Sp
4545
}
4646

4747
override def apply(plan: SparkPlan): SparkPlan = {
48-
if (!conf.dynamicPartitionPruningEnabled && !conf.runtimeRowLevelOperationGroupFilterEnabled) {
48+
if (!conf.dynamicPartitionPruningEnabled) {
4949
return plan
5050
}
5151

sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Implicits, Dat
3737
*
3838
* Note this rule only applies to group-based row-level operations.
3939
*/
40-
object RowLevelOperationRuntimeGroupFiltering extends Rule[LogicalPlan] with PredicateHelper {
40+
class RowLevelOperationRuntimeGroupFiltering(optimizeSubqueries: Rule[LogicalPlan])
41+
extends Rule[LogicalPlan] with PredicateHelper {
4142

4243
import DataSourceV2Implicits._
4344

@@ -64,7 +65,8 @@ object RowLevelOperationRuntimeGroupFiltering extends Rule[LogicalPlan] with Pre
6465
Filter(dynamicPruningCond, r)
6566
}
6667

67-
replaceData.copy(query = newQuery)
68+
// optimize subqueries to rewrite them as joins and trigger job planning
69+
replaceData.copy(query = optimizeSubqueries(newQuery))
6870
}
6971

7072
private def buildMatchingRowsPlan(

0 commit comments

Comments
 (0)