@@ -32,7 +32,31 @@ import org.apache.spark.util.{EventLoop, ThreadUtils}
3232/**
3333 * This class dynamically creates [[QueryStage ]] bottom-up, optimize the query plan of query stages
3434 * and materialize them. It creates as many query stages as possible at the same time, and
35- * creates/optimizes a query stage when all its child stages are materialized.
35+ * materialize a query stage when all its child stages are materialized.
36+ *
37+ * To create query stages, we traverse the query tree bottom up. When we hit an exchange node, and
38+ * all the child query stages of this exchange node are materialized, we try to create a new query
39+ * stage for this exchange node.
40+ *
41+ * To create a new query stage, we first optimize the sub-tree of the exchange. After optimization,
42+ * we check the output partitioning of the optimized sub-tree, and see if the exchange node is still
43+ * necessary.
44+ *
45+ * If the exchange node becomes unnecessary, remove it and give up this query stage creation, and
46+ * continue to traverse the query plan tree until we hit the next exchange node.
47+ *
48+ * If the exchange node is still needed, create the query stage and optimize its sub-tree again.
49+ * It's necessary to have both the pre-creation optimization and post-creation optimization, because
50+ * these 2 optimization have different assumptions. For pre-creation optimization, the shuffle node
51+ * may be removed later on and the current sub-tree may be only a part of a query stage, so we don't
52+ * have the big picture of the query stage yet. For post-creation optimization, the query stage is
53+ * created and we have the big picture of the query stage.
54+ *
55+ * After the query stage is optimized, we materialize it asynchronously, and continue to traverse
56+ * the query plan tree to create more query stages.
57+ *
58+ * When a query stage completes materialization, we trigger the process of query stages creation and
59+ * traverse the query plan tree again.
3660 */
3761class QueryStageCreator (
3862 initialPlan : SparkPlan ,
@@ -48,16 +72,24 @@ class QueryStageCreator(
4872
4973 private val stageCache = mutable.HashMap .empty[StructType , mutable.Buffer [(Exchange , QueryStage )]]
5074
51- private val phaseOneOptimizerRules : Seq [Rule [SparkPlan ]] = Seq (
75+ // The optimizer rules that will be applied to a sub-tree of the query plan before the stage is
76+ // created. Note that we may end up not creating the query stage, so the rules here should not
77+ // assume the given sub-plan-tree is the entire query plan of the query stage. For example, if a
78+ // rule want to collect all the child query stages, it should not be put here.
79+ private val preStageCreationOptimizerRules : Seq [Rule [SparkPlan ]] = Seq (
5280 AssertChildStagesMaterialized
5381 )
5482
55- private val phaseTwoOptimizerRules : Seq [Rule [SparkPlan ]] = Seq (
83+ // The optimizer rules that will be applied to a sub-tree of the query plan after the stage is
84+ // created. Note that once the stage is created, we will not remove it anymore. If a rule changes
85+ // the output partitioning of the sub-plan-tree, which may help to remove the exchange node, it's
86+ // better to put it in `preStageCreationOptimizerRules`, so that we may create less query stages.
87+ private val postStageCreationOptimizerRules : Seq [Rule [SparkPlan ]] = Seq (
5688 ReduceNumShufflePartitions (conf),
5789 CollapseCodegenStages (conf),
5890 ReuseSubquery (conf))
5991
60- private var currentPlan = createBottomQueryStages (initialPlan)
92+ private var currentPlan = createQueryStages (initialPlan)
6193
6294 private implicit def executionContext : ExecutionContextExecutorService = {
6395 QueryStageCreator .executionContext
@@ -80,26 +112,29 @@ class QueryStageCreator(
80112 stop()
81113 } else {
82114 readyStages += stage.id
83- currentPlan = createBottomQueryStages (currentPlan)
115+ currentPlan = createQueryStages (currentPlan)
84116 }
85117 }
86118
87- private def phaseOneOptimize (plan : SparkPlan ): SparkPlan = {
88- phaseOneOptimizerRules .foldLeft(plan) {
119+ private def preStageCreationOptimize (plan : SparkPlan ): SparkPlan = {
120+ preStageCreationOptimizerRules .foldLeft(plan) {
89121 case (current, rule) => rule(current)
90122 }
91123 }
92124
93- private def phaseTwoOptimize (plan : SparkPlan ): SparkPlan = {
94- phaseTwoOptimizerRules .foldLeft(plan) {
125+ private def postStageCreationOptimize (plan : SparkPlan ): SparkPlan = {
126+ postStageCreationOptimizerRules .foldLeft(plan) {
95127 case (current, rule) => rule(current)
96128 }
97129 }
98130
99- private def createBottomQueryStages (plan : SparkPlan ): SparkPlan = {
100- val result = tryCreateQueryStage(plan)
101- if (result.stageReady) {
102- val finalPlan = phaseTwoOptimize(phaseOneOptimize(result.newPlan))
131+ /**
132+ * Traverse the query plan bottom-up, and creates query stages as many as possible.
133+ */
134+ private def createQueryStages (plan : SparkPlan ): SparkPlan = {
135+ val result = createQueryStages0(plan)
136+ if (result.allChildStagesReady) {
137+ val finalPlan = postStageCreationOptimize(preStageCreationOptimize(result.newPlan))
103138 post(StageReady (ResultQueryStage (currentStageId, finalPlan)))
104139 finalPlan
105140 } else {
@@ -108,57 +143,70 @@ class QueryStageCreator(
108143 }
109144 }
110145
111- private def tryCreateQueryStage (plan : SparkPlan ): CreateStageResult = plan match {
146+ /**
147+ * This method is called recursively to traverse the plan tree bottom-up. This method returns two
148+ * information: 1) the new plan after we insert query stages. 2) whether or not the child query
149+ * stages of the new plan are all ready.
150+ *
151+ * if the current plan is an exchange node, and all its child query stages are ready, we try to
152+ * create a new query stage.
153+ */
154+ private def createQueryStages0 (plan : SparkPlan ): CreateStageResult = plan match {
112155 case e : Exchange =>
113156 val similarStages = stageCache.getOrElseUpdate(e.schema, mutable.Buffer .empty)
114157 similarStages.find(_._1.sameResult(e)) match {
115158 case Some ((_, existingStage)) if conf.exchangeReuseEnabled =>
116159 CreateStageResult (
117160 newPlan = ReusedQueryStage (existingStage, e.output),
118- stageReady = readyStages.contains(existingStage.id))
161+ allChildStagesReady = readyStages.contains(existingStage.id))
119162
120163 case _ =>
121- val result = tryCreateQueryStage(e.child)
122- if (result.stageReady) {
123- val optimizedPlan = phaseOneOptimize(result.newPlan)
164+ val result = createQueryStages0(e.child)
165+ // Try to create a query stage only when all the child query stages are ready.
166+ if (result.allChildStagesReady) {
167+ val optimizedPlan = preStageCreationOptimize(result.newPlan)
124168 e match {
125169 case s : ShuffleExchangeExec =>
126170 (s.desiredPartitioning, optimizedPlan.outputPartitioning) match {
127171 case (desired : HashPartitioning , actual : HashPartitioning )
128172 if desired.semanticEquals(actual) =>
129173 // This shuffle exchange is unnecessary now, remove it. The reason maybe:
130- // 1. the child plan has changed its output partitioning, and makes this
131- // exchange node unnecessary.
174+ // 1. the child plan has changed its output partitioning after optimization,
175+ // and makes this exchange node unnecessary.
132176 // 2. this exchange node is user specified, which turns out to be unnecessary.
133- CreateStageResult (newPlan = optimizedPlan, stageReady = true )
177+ CreateStageResult (newPlan = optimizedPlan, allChildStagesReady = true )
134178 case _ =>
135179 val queryStage = createQueryStage(s.copy(child = optimizedPlan))
136180 similarStages.append(e -> queryStage)
137- CreateStageResult (newPlan = queryStage, stageReady = false )
181+ // We've created a new stage, which is obviously not ready yet.
182+ CreateStageResult (newPlan = queryStage, allChildStagesReady = false )
138183 }
139184
140185 case b : BroadcastExchangeExec =>
141186 val queryStage = createQueryStage(b.copy(child = optimizedPlan))
142187 similarStages.append(e -> queryStage)
143- CreateStageResult (newPlan = queryStage, stageReady = false )
188+ // We've created a new stage, which is obviously not ready yet.
189+ CreateStageResult (newPlan = queryStage, allChildStagesReady = false )
144190 }
145191 } else {
146- CreateStageResult (newPlan = e.withNewChildren(Seq (result.newPlan)), stageReady = false )
192+ CreateStageResult (
193+ newPlan = e.withNewChildren(Seq (result.newPlan)),
194+ allChildStagesReady = false )
147195 }
148196 }
149197
150198 case q : QueryStage =>
151- CreateStageResult (newPlan = q, stageReady = readyStages.contains(q.id))
199+ CreateStageResult (newPlan = q, allChildStagesReady = readyStages.contains(q.id))
152200
153201 case _ =>
154- val results = plan.children.map(tryCreateQueryStage )
202+ val results = plan.children.map(createQueryStages0 )
155203 CreateStageResult (
156204 newPlan = plan.withNewChildren(results.map(_.newPlan)),
157- stageReady = results.forall(_.stageReady ))
205+ allChildStagesReady = results.forall(_.allChildStagesReady ))
158206 }
159207
160208 private def createQueryStage (e : Exchange ): QueryStage = {
161- val optimizedPlan = phaseTwoOptimize (e.child)
209+ val optimizedPlan = postStageCreationOptimize (e.child)
162210 val queryStage = e match {
163211 case s : ShuffleExchangeExec =>
164212 ShuffleQueryStage (currentStageId, s.copy(child = optimizedPlan))
@@ -173,7 +221,7 @@ class QueryStageCreator(
173221 override protected def onError (e : Throwable ): Unit = callback.onError(e)
174222}
175223
176- case class CreateStageResult (newPlan : SparkPlan , stageReady : Boolean )
224+ case class CreateStageResult (newPlan : SparkPlan , allChildStagesReady : Boolean )
177225
178226object QueryStageCreator {
179227 private val executionContext = ExecutionContext .fromExecutorService(
0 commit comments