Skip to content

Commit 0820beb

Browse files
cloud-fanHyukjinKwon
authored andcommitted
[SPARK-28863][SQL][FOLLOWUP][3.0] Make sure optimized plan will not be re-analyzed
backport #30777 to 3.0 ---------- ### What changes were proposed in this pull request? It's a known issue that re-analyzing an optimized plan can lead to various issues. We made several attempts to avoid it from happening, but the current solution `AlreadyOptimized` is still not 100% safe, as people can inject catalyst rules to call analyzer directly. This PR proposes a simpler and safer idea: we set the `analyzed` flag to true after optimization, and analyzer will skip processing plans whose `analyzed` flag is true. ### Why are the changes needed? make the code simpler and safer ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? existing tests. Closes #30872 from cloud-fan/ds. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: HyukjinKwon <[email protected]>
1 parent c9fe712 commit 0820beb

File tree

6 files changed

+16
-127
lines changed

6 files changed

+16
-127
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ class Analyzer(
144144
}
145145

146146
def executeAndCheck(plan: LogicalPlan, tracker: QueryPlanningTracker): LogicalPlan = {
147+
if (plan.analyzed) return plan
147148
AnalysisHelper.markInAnalyzer {
148149
val analyzed = executeAndTrack(plan, tracker)
149150
try {

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelper.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ trait AnalysisHelper extends QueryPlan[LogicalPlan] { self: LogicalPlan =>
4646
* Recursively marks all nodes in this plan tree as analyzed.
4747
* This should only be called by [[CheckAnalysis]].
4848
*/
49-
private[catalyst] def setAnalyzed(): Unit = {
49+
private[sql] def setAnalyzed(): Unit = {
5050
if (!_analyzed) {
5151
_analyzed = true
5252
children.foreach(_.setAnalyzed())
@@ -180,6 +180,11 @@ trait AnalysisHelper extends QueryPlan[LogicalPlan] { self: LogicalPlan =>
180180
super.transformAllExpressions(rule)
181181
}
182182

183+
override def clone(): LogicalPlan = {
184+
val cloned = super.clone()
185+
if (analyzed) cloned.setAnalyzed()
186+
cloned
187+
}
183188
}
184189

185190

sql/core/src/main/scala/org/apache/spark/sql/execution/AlreadyOptimized.scala

Lines changed: 0 additions & 37 deletions
This file was deleted.

sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,12 @@ class QueryExecution(
7979
lazy val optimizedPlan: LogicalPlan = executePhase(QueryPlanningTracker.OPTIMIZATION) {
8080
// clone the plan to avoid sharing the plan instance between different stages like analyzing,
8181
// optimizing and planning.
82-
sparkSession.sessionState.optimizer.executeAndTrack(withCachedData.clone(), tracker)
82+
val plan = sparkSession.sessionState.optimizer.executeAndTrack(withCachedData.clone(), tracker)
83+
// We do not want optimized plans to be re-analyzed as literals that have been constant folded
84+
// and such can cause issues during analysis. While `clone` should maintain the `analyzed` state
85+
// of the LogicalPlan, we set the plan as analyzed here as well out of paranoia.
86+
plan.setAnalyzed()
87+
plan
8388
}
8489

8590
private def assertOptimized(): Unit = optimizedPlan

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,13 @@ package org.apache.spark.sql.execution.datasources.v2
2020
import java.util.UUID
2121

2222
import org.apache.spark.SparkException
23+
import org.apache.spark.sql.Dataset
2324
import org.apache.spark.sql.catalyst.InternalRow
2425
import org.apache.spark.sql.catalyst.expressions.Attribute
2526
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
2627
import org.apache.spark.sql.connector.catalog.SupportsWrite
2728
import org.apache.spark.sql.connector.write.{LogicalWriteInfoImpl, SupportsOverwrite, SupportsTruncate, V1WriteBuilder, WriteBuilder}
28-
import org.apache.spark.sql.execution.{AlreadyOptimized, SparkPlan}
29+
import org.apache.spark.sql.execution.SparkPlan
2930
import org.apache.spark.sql.sources.{AlwaysTrue, Filter, InsertableRelation}
3031
import org.apache.spark.sql.util.CaseInsensitiveStringMap
3132

@@ -113,8 +114,7 @@ trait SupportsV1Write extends SparkPlan {
113114
def plan: LogicalPlan
114115

115116
protected def writeWithV1(relation: InsertableRelation): Seq[InternalRow] = {
116-
// The `plan` is already optimized, we should not analyze and optimize it again.
117-
relation.insert(AlreadyOptimized.dataFrame(sqlContext.sparkSession, plan), overwrite = false)
117+
relation.insert(Dataset.ofRows(sqlContext.sparkSession, plan), overwrite = false)
118118
Nil
119119
}
120120
}

sql/core/src/test/scala/org/apache/spark/sql/execution/AlreadyOptimizedSuite.scala

Lines changed: 0 additions & 85 deletions
This file was deleted.

0 commit comments

Comments
 (0)