Skip to content

Commit b4bea1a

Browse files
cloud-fanHyukjinKwon
authored andcommitted
[SPARK-28863][SQL][FOLLOWUP] Make sure optimized plan will not be re-analyzed
### What changes were proposed in this pull request? It's a known issue that re-analyzing an optimized plan can lead to various issues. We made several attempts to avoid it from happening, but the current solution `AlreadyOptimized` is still not 100% safe, as people can inject catalyst rules to call analyzer directly. This PR proposes a simpler and safer idea: we set the `analyzed` flag to true after optimization, and analyzer will skip processing plans whose `analyzed` flag is true. ### Why are the changes needed? make the code simpler and safer ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? existing tests. Closes apache#30777 from cloud-fan/ds. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: HyukjinKwon <[email protected]>
1 parent cdd1752 commit b4bea1a

File tree

6 files changed

+16
-128
lines changed

6 files changed

+16
-128
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@ class Analyzer(override val catalogManager: CatalogManager)
168168
}
169169

170170
def executeAndCheck(plan: LogicalPlan, tracker: QueryPlanningTracker): LogicalPlan = {
171+
if (plan.analyzed) return plan
171172
AnalysisHelper.markInAnalyzer {
172173
val analyzed = executeAndTrack(plan, tracker)
173174
try {

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelper.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ trait AnalysisHelper extends QueryPlan[LogicalPlan] { self: LogicalPlan =>
4646
* This should only be called by
4747
* [[org.apache.spark.sql.catalyst.analysis.CheckAnalysis]].
4848
*/
49-
private[catalyst] def setAnalyzed(): Unit = {
49+
private[sql] def setAnalyzed(): Unit = {
5050
if (!_analyzed) {
5151
_analyzed = true
5252
children.foreach(_.setAnalyzed())
@@ -180,6 +180,11 @@ trait AnalysisHelper extends QueryPlan[LogicalPlan] { self: LogicalPlan =>
180180
super.transformAllExpressions(rule)
181181
}
182182

183+
override def clone(): LogicalPlan = {
184+
val cloned = super.clone()
185+
if (analyzed) cloned.setAnalyzed()
186+
cloned
187+
}
183188
}
184189

185190

sql/core/src/main/scala/org/apache/spark/sql/execution/AlreadyOptimized.scala

Lines changed: 0 additions & 37 deletions
This file was deleted.

sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,12 @@ class QueryExecution(
8484
lazy val optimizedPlan: LogicalPlan = executePhase(QueryPlanningTracker.OPTIMIZATION) {
8585
// clone the plan to avoid sharing the plan instance between different stages like analyzing,
8686
// optimizing and planning.
87-
sparkSession.sessionState.optimizer.executeAndTrack(withCachedData.clone(), tracker)
87+
val plan = sparkSession.sessionState.optimizer.executeAndTrack(withCachedData.clone(), tracker)
88+
// We do not want optimized plans to be re-analyzed as literals that have been constant folded
89+
// and such can cause issues during analysis. While `clone` should maintain the `analyzed` state
90+
// of the LogicalPlan, we set the plan as analyzed here as well out of paranoia.
91+
plan.setAnalyzed()
92+
plan
8893
}
8994

9095
private def assertOptimized(): Unit = optimizedPlan

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,13 @@ package org.apache.spark.sql.execution.datasources.v2
2020
import java.util.UUID
2121

2222
import org.apache.spark.SparkException
23+
import org.apache.spark.sql.Dataset
2324
import org.apache.spark.sql.catalyst.InternalRow
2425
import org.apache.spark.sql.catalyst.expressions.Attribute
2526
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
2627
import org.apache.spark.sql.connector.catalog.SupportsWrite
2728
import org.apache.spark.sql.connector.write.{LogicalWriteInfoImpl, SupportsOverwrite, SupportsTruncate, V1WriteBuilder, WriteBuilder}
28-
import org.apache.spark.sql.execution.{AlreadyOptimized, SparkPlan}
29+
import org.apache.spark.sql.execution.SparkPlan
2930
import org.apache.spark.sql.sources.{AlwaysTrue, Filter, InsertableRelation}
3031
import org.apache.spark.sql.util.CaseInsensitiveStringMap
3132

@@ -118,9 +119,7 @@ trait SupportsV1Write extends SparkPlan {
118119
protected def writeWithV1(
119120
relation: InsertableRelation,
120121
refreshCache: () => Unit = () => ()): Seq[InternalRow] = {
121-
val session = sqlContext.sparkSession
122-
// The `plan` is already optimized, we should not analyze and optimize it again.
123-
relation.insert(AlreadyOptimized.dataFrame(session, plan), overwrite = false)
122+
relation.insert(Dataset.ofRows(sqlContext.sparkSession, plan), overwrite = false)
124123
refreshCache()
125124

126125
Nil

sql/core/src/test/scala/org/apache/spark/sql/execution/AlreadyOptimizedSuite.scala

Lines changed: 0 additions & 85 deletions
This file was deleted.

0 commit comments

Comments
 (0)