Skip to content

Commit 325bc8e

Browse files
maryannxuehvanhovell
authored andcommitted
[SPARK-28583][SQL] Subqueries should not call onUpdatePlan in Adaptive Query Execution
## What changes were proposed in this pull request? Subqueries do not have their own execution id, thus when calling `AdaptiveSparkPlanExec.onUpdatePlan`, it will actually get the `QueryExecution` instance of the main query, which is wasteful and problematic. It could cause issues like stack overflow or dead locks in some circumstances. This PR fixes this issue by making `AdaptiveSparkPlanExec` compare the `QueryExecution` object retrieved by current execution ID against the `QueryExecution` object from which this plan is created, and only update the UI when the two instances are the same. ## How was this patch tested? Manual tests on TPC-DS queries. Closes #25316 from maryannxue/aqe-updateplan-fix. Authored-by: maryannxue <[email protected]> Signed-off-by: herman <[email protected]>
1 parent a59fdc4 commit 325bc8e

File tree

3 files changed

+22
-9
lines changed

3 files changed

+22
-9
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ class QueryExecution(
118118
protected def preparations: Seq[Rule[SparkPlan]] = Seq(
119119
// `AdaptiveSparkPlanExec` is a leaf node. If inserted, all the following rules will be no-op
120120
// as the original plan is hidden behind `AdaptiveSparkPlanExec`.
121-
InsertAdaptiveSparkPlan(sparkSession),
121+
InsertAdaptiveSparkPlan(sparkSession, this),
122122
PlanSubqueries(sparkSession),
123123
EnsureRequirements(sparkSession.sessionState.conf),
124124
ApplyColumnarRulesAndInsertTransitions(sparkSession.sessionState.conf,

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@ case class AdaptiveSparkPlanExec(
6161
initialPlan: SparkPlan,
6262
@transient session: SparkSession,
6363
@transient subqueryMap: Map[Long, ExecSubqueryExpression],
64-
@transient stageCache: TrieMap[SparkPlan, QueryStageExec])
64+
@transient stageCache: TrieMap[SparkPlan, QueryStageExec],
65+
@transient queryExecution: QueryExecution)
6566
extends LeafExecNode {
6667

6768
@transient private val lock = new Object()
@@ -118,8 +119,15 @@ case class AdaptiveSparkPlanExec(
118119
if (isFinalPlan) {
119120
currentPhysicalPlan.execute()
120121
} else {
122+
// Make sure we only update Spark UI if this plan's `QueryExecution` object matches the one
123+
// retrieved by the `sparkContext`'s current execution ID. Note that sub-queries do not have
124+
// their own execution IDs and therefore rely on the main query to update UI.
121125
val executionId = Option(
122-
session.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)).map(_.toLong)
126+
session.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)).flatMap { idStr =>
127+
val id = idStr.toLong
128+
val qe = SQLExecution.getQueryExecution(id)
129+
if (qe.eq(queryExecution)) Some(id) else None
130+
}
123131
var currentLogicalPlan = currentPhysicalPlan.logicalLink.get
124132
var result = createQueryStages(currentPhysicalPlan)
125133
val events = new LinkedBlockingQueue[StageMaterializationEvent]()
@@ -171,10 +179,11 @@ case class AdaptiveSparkPlanExec(
171179
currentPhysicalPlan = applyPhysicalRules(result.newPlan, queryStageOptimizerRules)
172180
currentPhysicalPlan.setTagValue(SparkPlan.LOGICAL_PLAN_TAG, currentLogicalPlan)
173181
isFinalPlan = true
182+
183+
val ret = currentPhysicalPlan.execute()
174184
logDebug(s"Final plan: $currentPhysicalPlan")
175185
executionId.foreach(onUpdatePlan)
176-
177-
currentPhysicalPlan.execute()
186+
ret
178187
}
179188
}
180189

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,18 @@ import org.apache.spark.sql.types.StructType
3535
*
3636
* Note that this rule is stateful and thus should not be reused across query executions.
3737
*/
38-
case class InsertAdaptiveSparkPlan(session: SparkSession) extends Rule[SparkPlan] {
38+
case class InsertAdaptiveSparkPlan(
39+
session: SparkSession,
40+
queryExecution: QueryExecution) extends Rule[SparkPlan] {
3941

4042
private val conf = session.sessionState.conf
4143

4244
// Exchange-reuse is shared across the entire query, including sub-queries.
4345
private val stageCache = new TrieMap[SparkPlan, QueryStageExec]()
4446

45-
override def apply(plan: SparkPlan): SparkPlan = plan match {
47+
override def apply(plan: SparkPlan): SparkPlan = applyInternal(plan, queryExecution)
48+
49+
private def applyInternal(plan: SparkPlan, qe: QueryExecution): SparkPlan = plan match {
4650
case _: ExecutedCommandExec => plan
4751
case _ if conf.adaptiveExecutionEnabled && supportAdaptive(plan) =>
4852
try {
@@ -54,7 +58,7 @@ case class InsertAdaptiveSparkPlan(session: SparkSession) extends Rule[SparkPlan
5458
session.sessionState.conf, subqueryMap)
5559
val newPlan = AdaptiveSparkPlanExec.applyPhysicalRules(plan, preparations)
5660
logDebug(s"Adaptive execution enabled for plan: $plan")
57-
AdaptiveSparkPlanExec(newPlan, session, subqueryMap, stageCache)
61+
AdaptiveSparkPlanExec(newPlan, session, subqueryMap, stageCache, qe)
5862
} catch {
5963
case SubqueryAdaptiveNotSupportedException(subquery) =>
6064
logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} is enabled " +
@@ -120,7 +124,7 @@ case class InsertAdaptiveSparkPlan(session: SparkSession) extends Rule[SparkPlan
120124
val queryExec = new QueryExecution(session, plan)
121125
// Apply the same instance of this rule to sub-queries so that sub-queries all share the
122126
// same `stageCache` for Exchange reuse.
123-
val adaptivePlan = this.apply(queryExec.sparkPlan)
127+
val adaptivePlan = this.applyInternal(queryExec.sparkPlan, queryExec)
124128
if (!adaptivePlan.isInstanceOf[AdaptiveSparkPlanExec]) {
125129
throw SubqueryAdaptiveNotSupportedException(plan)
126130
}

0 commit comments

Comments
 (0)