Skip to content

Commit ecbd6db

Browse files
committed
insert query stages dynamically
1 parent ea93dbf commit ecbd6db

File tree

11 files changed

+274
-302
lines changed

11 files changed

+274
-302
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala

Lines changed: 5 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
3030
import org.apache.spark.sql.catalyst.rules.Rule
3131
import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat
3232
import org.apache.spark.sql.catalyst.util.truncatedString
33-
import org.apache.spark.sql.execution.adaptive.PlanQueryStage
33+
import org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan
3434
import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange}
3535
import org.apache.spark.sql.internal.SQLConf
3636
import org.apache.spark.util.Utils
@@ -94,33 +94,20 @@ class QueryExecution(
9494
* row format conversions as needed.
9595
*/
9696
protected def prepareForExecution(plan: SparkPlan): SparkPlan = {
97-
val rules = if (sparkSession.sessionState.conf.adaptiveExecutionEnabled) {
98-
adaptivePreparations
99-
} else {
100-
preparations
101-
}
102-
rules.foldLeft(plan) { case (sp, rule) => rule.apply(sp)}
97+
preparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp) }
10398
}
10499

105100
/** A sequence of rules that will be applied in order to the physical plan before execution. */
106101
protected def preparations: Seq[Rule[SparkPlan]] = Seq(
107102
PlanSubqueries(sparkSession),
108103
EnsureRequirements(sparkSession.sessionState.conf),
104+
// `AdaptiveSparkPlan` is a leaf node. If inserted, all the following rules will be no-op as
105+
// the original plan is hidden behind `AdaptiveSparkPlan`.
106+
InsertAdaptiveSparkPlan(sparkSession),
109107
CollapseCodegenStages(sparkSession.sessionState.conf),
110108
ReuseExchange(sparkSession.sessionState.conf),
111109
ReuseSubquery(sparkSession.sessionState.conf))
112110

113-
// With adaptive execution, whole stage codegen will be done inside `QueryStageExecutor`.
114-
protected def adaptivePreparations: Seq[Rule[SparkPlan]] = Seq(
115-
PlanSubqueries(sparkSession),
116-
EnsureRequirements(sparkSession.sessionState.conf),
117-
ReuseExchange(sparkSession.sessionState.conf),
118-
ReuseSubquery(sparkSession.sessionState.conf),
119-
// PlanQueryStage needs to be the last rule because it divides the plan into multiple sub-trees
120-
// by inserting leaf node QueryStage. Transforming the plan after applying this rule will
121-
// only transform node in a sub-tree.
122-
PlanQueryStage(sparkSession))
123-
124111
def simpleString: String = withRedaction {
125112
val concat = new StringConcat()
126113
concat.append("== Physical Plan ==\n")

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ private[execution] object SparkPlanInfo {
5353
def fromSparkPlan(plan: SparkPlan): SparkPlanInfo = {
5454
val children = plan match {
5555
case ReusedExchangeExec(_, child) => child :: Nil
56-
case a: AdaptiveSparkPlan => a.resultStage.plan :: Nil
56+
case a: AdaptiveSparkPlan => a.finalPlan.plan :: Nil
5757
case stage: QueryStage => stage.plan :: Nil
5858
case _ => plan.children ++ plan.subqueries
5959
}

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlan.scala

Lines changed: 25 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.adaptive
1919

2020
import java.util.concurrent.CountDownLatch
2121

22+
import org.apache.spark.SparkException
2223
import org.apache.spark.rdd.RDD
2324
import org.apache.spark.sql.SparkSession
2425
import org.apache.spark.sql.catalyst.InternalRow
@@ -31,44 +32,29 @@ import org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate
3132
* updates the query plan when a query stage is materialized and provides accurate runtime
3233
* statistics.
3334
*/
34-
case class AdaptiveSparkPlan(initialPlan: ResultQueryStage, session: SparkSession)
35+
case class AdaptiveSparkPlan(initialPlan: SparkPlan, session: SparkSession)
3536
extends LeafExecNode{
3637

3738
override def output: Seq[Attribute] = initialPlan.output
3839

39-
@volatile private var currentQueryStage: QueryStage = initialPlan
40+
@volatile private var currentPlan: SparkPlan = initialPlan
4041
@volatile private var error: Throwable = null
41-
private val readyLock = new CountDownLatch(1)
4242

43-
private def replaceStage(oldStage: QueryStage, newStage: QueryStage): QueryStage = {
44-
if (oldStage.id == newStage.id) {
45-
newStage
46-
} else {
47-
val newPlanForOldStage = oldStage.plan.transform {
48-
case q: QueryStage => replaceStage(q, newStage)
49-
}
50-
oldStage.withNewPlan(newPlanForOldStage)
51-
}
52-
}
43+
// We will release the lock when we finish planning query stages, or we fail to do the planning.
44+
// Getting `resultStage` will be blocked until the lock is release.
45+
// This is better than wait()/notify(), as we can easily check if the computation has completed,
46+
// by calling `readyLock.getCount()`.
47+
private val readyLock = new CountDownLatch(1)
5348

5449
private def createCallback(executionId: Option[Long]): QueryStageTriggerCallback = {
5550
new QueryStageTriggerCallback {
56-
override def onStageUpdated(stage: QueryStage): Unit = {
57-
updateCurrentQueryStage(stage, executionId)
58-
if (stage.isInstanceOf[ResultQueryStage]) readyLock.countDown()
59-
}
60-
61-
override def onStagePlanningFailed(stage: QueryStage, e: Throwable): Unit = {
62-
error = new RuntimeException(
63-
s"""
64-
|Fail to plan stage ${stage.id}:
65-
|${stage.plan.treeString}
66-
""".stripMargin, e)
67-
readyLock.countDown()
51+
override def onPlanUpdate(updatedPlan: SparkPlan): Unit = {
52+
updateCurrentPlan(updatedPlan, executionId)
53+
if (updatedPlan.isInstanceOf[ResultQueryStage]) readyLock.countDown()
6854
}
6955

7056
override def onStageMaterializingFailed(stage: QueryStage, e: Throwable): Unit = {
71-
error = new RuntimeException(
57+
error = new SparkException(
7258
s"""
7359
|Fail to materialize stage ${stage.id}:
7460
|${stage.plan.treeString}
@@ -83,35 +69,34 @@ case class AdaptiveSparkPlan(initialPlan: ResultQueryStage, session: SparkSessio
8369
}
8470
}
8571

86-
private def updateCurrentQueryStage(newStage: QueryStage, executionId: Option[Long]): Unit = {
87-
currentQueryStage = replaceStage(currentQueryStage, newStage)
72+
private def updateCurrentPlan(newPlan: SparkPlan, executionId: Option[Long]): Unit = {
73+
currentPlan = newPlan
8874
executionId.foreach { id =>
8975
session.sparkContext.listenerBus.post(SparkListenerSQLAdaptiveExecutionUpdate(
9076
id,
9177
SQLExecution.getQueryExecution(id).toString,
92-
SparkPlanInfo.fromSparkPlan(currentQueryStage)))
78+
SparkPlanInfo.fromSparkPlan(currentPlan)))
9379
}
9480
}
9581

96-
def resultStage: ResultQueryStage = {
82+
def finalPlan: ResultQueryStage = {
9783
if (readyLock.getCount > 0) {
9884
val sc = session.sparkContext
9985
val executionId = Option(sc.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)).map(_.toLong)
100-
val trigger = new QueryStageTrigger(session, createCallback(executionId))
101-
trigger.start()
102-
trigger.trigger(initialPlan)
86+
val creator = new QueryStageCreator(initialPlan, session, createCallback(executionId))
87+
creator.start()
10388
readyLock.await()
104-
trigger.stop()
89+
creator.stop()
10590
}
10691

10792
if (error != null) throw error
108-
currentQueryStage.asInstanceOf[ResultQueryStage]
93+
currentPlan.asInstanceOf[ResultQueryStage]
10994
}
11095

111-
override def executeCollect(): Array[InternalRow] = resultStage.executeCollect()
112-
override def executeTake(n: Int): Array[InternalRow] = resultStage.executeTake(n)
113-
override def executeToIterator(): Iterator[InternalRow] = resultStage.executeToIterator()
114-
override def doExecute(): RDD[InternalRow] = resultStage.execute()
96+
override def executeCollect(): Array[InternalRow] = finalPlan.executeCollect()
97+
override def executeTake(n: Int): Array[InternalRow] = finalPlan.executeTake(n)
98+
override def executeToIterator(): Iterator[InternalRow] = finalPlan.executeToIterator()
99+
override def doExecute(): RDD[InternalRow] = finalPlan.execute()
115100
override def generateTreeString(
116101
depth: Int,
117102
lastChildren: Seq[Boolean],
@@ -120,7 +105,7 @@ case class AdaptiveSparkPlan(initialPlan: ResultQueryStage, session: SparkSessio
120105
prefix: String = "",
121106
addSuffix: Boolean = false,
122107
maxFields: Int): Unit = {
123-
currentQueryStage.generateTreeString(
108+
currentPlan.generateTreeString(
124109
depth, lastChildren, append, verbose, "", false, maxFields)
125110
}
126111
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.adaptive
19+
20+
import org.apache.spark.sql.SparkSession
21+
import org.apache.spark.sql.catalyst.rules.Rule
22+
import org.apache.spark.sql.execution.SparkPlan
23+
import org.apache.spark.sql.execution.command.ExecutedCommandExec
24+
25+
case class InsertAdaptiveSparkPlan(session: SparkSession) extends Rule[SparkPlan] {
26+
27+
override def apply(plan: SparkPlan): SparkPlan = plan match {
28+
case _: ExecutedCommandExec => plan
29+
case _ if session.sessionState.conf.adaptiveExecutionEnabled =>
30+
AdaptiveSparkPlan(plan, session.cloneSession())
31+
}
32+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanQueryStage.scala

Lines changed: 0 additions & 57 deletions
This file was deleted.

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStage.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ abstract class QueryStage extends LeafExecNode {
6565
override def executeToIterator(): Iterator[InternalRow] = plan.executeToIterator()
6666
override def doExecute(): RDD[InternalRow] = plan.execute()
6767
override def doExecuteBroadcast[T](): Broadcast[T] = plan.executeBroadcast()
68+
override def doCanonicalize(): SparkPlan = plan.canonicalized
6869

6970
// TODO: maybe we should not hide QueryStage entirely from explain result.
7071
override def generateTreeString(
@@ -86,7 +87,7 @@ abstract class QueryStage extends LeafExecNode {
8687
case class ResultQueryStage(id: Int, plan: SparkPlan) extends QueryStage {
8788

8889
override def materialize(): Future[Any] = {
89-
Future.unit
90+
throw new IllegalStateException("Cannot materialize ResultQueryStage.")
9091
}
9192

9293
override def withNewPlan(newPlan: SparkPlan): QueryStage = {

0 commit comments

Comments
 (0)