Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.adaptive.PlanQueryStage
import org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan
import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -94,32 +94,19 @@ class QueryExecution(
* row format conversions as needed.
*/
protected def prepareForExecution(plan: SparkPlan): SparkPlan = {
val rules = if (sparkSession.sessionState.conf.adaptiveExecutionEnabled) {
adaptivePreparations
} else {
preparations
}
rules.foldLeft(plan) { case (sp, rule) => rule.apply(sp)}
preparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp) }
}

/** A sequence of rules that will be applied in order to the physical plan before execution. */
protected def preparations: Seq[Rule[SparkPlan]] = Seq(
PlanSubqueries(sparkSession),
ReuseSubquery(sparkSession.sessionState.conf),
EnsureRequirements(sparkSession.sessionState.conf),
// `AdaptiveSparkPlan` is a leaf node. If inserted, all the following rules will be no-op as
// the original plan is hidden behind `AdaptiveSparkPlan`.
InsertAdaptiveSparkPlan(sparkSession),
CollapseCodegenStages(sparkSession.sessionState.conf),
ReuseExchange(sparkSession.sessionState.conf),
ReuseSubquery(sparkSession.sessionState.conf))

// With adaptive execution, whole stage codegen will be done inside `QueryStageExecutor`.
protected def adaptivePreparations: Seq[Rule[SparkPlan]] = Seq(
PlanSubqueries(sparkSession),
EnsureRequirements(sparkSession.sessionState.conf),
ReuseExchange(sparkSession.sessionState.conf),
ReuseSubquery(sparkSession.sessionState.conf),
// PlanQueryStage needs to be the last rule because it divides the plan into multiple sub-trees
// by inserting leaf node QueryStage. Transforming the plan after applying this rule will
// only transform node in a sub-tree.
PlanQueryStage(sparkSession))
ReuseExchange(sparkSession.sessionState.conf))

def simpleString: String = withRedaction {
val concat = new StringConcat()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ private[execution] object SparkPlanInfo {
def fromSparkPlan(plan: SparkPlan): SparkPlanInfo = {
val children = plan match {
case ReusedExchangeExec(_, child) => child :: Nil
case a: AdaptiveSparkPlan => a.resultStage.plan :: Nil
case a: AdaptiveSparkPlan => a.finalPlan.plan :: Nil
case stage: QueryStage => stage.plan :: Nil
case _ => plan.children ++ plan.subqueries
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.adaptive

import java.util.concurrent.CountDownLatch

import org.apache.spark.SparkException
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
Expand All @@ -27,48 +28,33 @@ import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan, SparkPlanInfo, S
import org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate

/**
* A root node to trigger query stages and execute the query plan adaptively. It incrementally
* A root node to execute the query plan adaptively. It creates query stages, and incrementally
* updates the query plan when a query stage is materialized and provides accurate runtime
* statistics.
* data statistics.
*/
case class AdaptiveSparkPlan(initialPlan: ResultQueryStage, session: SparkSession)
case class AdaptiveSparkPlan(initialPlan: SparkPlan, session: SparkSession)
extends LeafExecNode{

override def output: Seq[Attribute] = initialPlan.output

@volatile private var currentQueryStage: QueryStage = initialPlan
@volatile private var currentPlan: SparkPlan = initialPlan
@volatile private var error: Throwable = null
private val readyLock = new CountDownLatch(1)

private def replaceStage(oldStage: QueryStage, newStage: QueryStage): QueryStage = {
if (oldStage.id == newStage.id) {
newStage
} else {
val newPlanForOldStage = oldStage.plan.transform {
case q: QueryStage => replaceStage(q, newStage)
}
oldStage.withNewPlan(newPlanForOldStage)
}
}
// We will release the lock when we finish planning query stages, or we fail to do the planning.
// Getting `resultStage` will be blocked until the lock is release.
// This is better than wait()/notify(), as we can easily check if the computation has completed,
// by calling `readyLock.getCount()`.
private val readyLock = new CountDownLatch(1)

private def createCallback(executionId: Option[Long]): QueryStageTriggerCallback = {
new QueryStageTriggerCallback {
override def onStageUpdated(stage: QueryStage): Unit = {
updateCurrentQueryStage(stage, executionId)
if (stage.isInstanceOf[ResultQueryStage]) readyLock.countDown()
}

override def onStagePlanningFailed(stage: QueryStage, e: Throwable): Unit = {
error = new RuntimeException(
s"""
|Fail to plan stage ${stage.id}:
|${stage.plan.treeString}
""".stripMargin, e)
readyLock.countDown()
override def onPlanUpdate(updatedPlan: SparkPlan): Unit = {
updateCurrentPlan(updatedPlan, executionId)
if (updatedPlan.isInstanceOf[ResultQueryStage]) readyLock.countDown()
}

override def onStageMaterializingFailed(stage: QueryStage, e: Throwable): Unit = {
error = new RuntimeException(
error = new SparkException(
s"""
|Fail to materialize stage ${stage.id}:
|${stage.plan.treeString}
Expand All @@ -83,35 +69,34 @@ case class AdaptiveSparkPlan(initialPlan: ResultQueryStage, session: SparkSessio
}
}

private def updateCurrentQueryStage(newStage: QueryStage, executionId: Option[Long]): Unit = {
currentQueryStage = replaceStage(currentQueryStage, newStage)
private def updateCurrentPlan(newPlan: SparkPlan, executionId: Option[Long]): Unit = {
currentPlan = newPlan
executionId.foreach { id =>
session.sparkContext.listenerBus.post(SparkListenerSQLAdaptiveExecutionUpdate(
id,
SQLExecution.getQueryExecution(id).toString,
SparkPlanInfo.fromSparkPlan(currentQueryStage)))
SparkPlanInfo.fromSparkPlan(currentPlan)))
}
}

def resultStage: ResultQueryStage = {
def finalPlan: ResultQueryStage = {
if (readyLock.getCount > 0) {
val sc = session.sparkContext
val executionId = Option(sc.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)).map(_.toLong)
val trigger = new QueryStageTrigger(session, createCallback(executionId))
trigger.start()
trigger.trigger(initialPlan)
val creator = new QueryStageCreator(initialPlan, session, createCallback(executionId))
creator.start()
readyLock.await()
trigger.stop()
creator.stop()
}

if (error != null) throw error
currentQueryStage.asInstanceOf[ResultQueryStage]
currentPlan.asInstanceOf[ResultQueryStage]
}

override def executeCollect(): Array[InternalRow] = resultStage.executeCollect()
override def executeTake(n: Int): Array[InternalRow] = resultStage.executeTake(n)
override def executeToIterator(): Iterator[InternalRow] = resultStage.executeToIterator()
override def doExecute(): RDD[InternalRow] = resultStage.execute()
override def executeCollect(): Array[InternalRow] = finalPlan.executeCollect()
override def executeTake(n: Int): Array[InternalRow] = finalPlan.executeTake(n)
override def executeToIterator(): Iterator[InternalRow] = finalPlan.executeToIterator()
override def doExecute(): RDD[InternalRow] = finalPlan.execute()
override def generateTreeString(
depth: Int,
lastChildren: Seq[Boolean],
Expand All @@ -120,7 +105,7 @@ case class AdaptiveSparkPlan(initialPlan: ResultQueryStage, session: SparkSessio
prefix: String = "",
addSuffix: Boolean = false,
maxFields: Int): Unit = {
currentQueryStage.generateTreeString(
currentPlan.generateTreeString(
depth, lastChildren, append, verbose, "", false, maxFields)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.adaptive

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.command.ExecutedCommandExec

/**
* This rule wraps the query plan with an [[AdaptiveSparkPlan]], which executes the query plan
* adaptively with runtime data statistics. Note that this rule must be run after
* [[org.apache.spark.sql.execution.exchange.EnsureRequirements]], so that the exchange nodes are
* already inserted.
*/
case class InsertAdaptiveSparkPlan(session: SparkSession) extends Rule[SparkPlan] {

override def apply(plan: SparkPlan): SparkPlan = plan match {
case _: ExecutedCommandExec => plan
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also need support AE for ExecutedCommandExec. For example the last node may be a CreateHiveTableAsSelectCommand, we also want to run the plan in AE mode.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CreateHiveTableAsSelectCommand is not a ExecutedCommandExec any more.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CreateHiveTableAsSelectCommand is not a ExecutedCommandExec any more.

case _ if session.sessionState.conf.adaptiveExecutionEnabled =>
AdaptiveSparkPlan(plan, session.cloneSession())
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.exchange._

/**
* In adaptive execution mode, an execution plan is divided into multiple QueryStages w.r.t. the
* exchange as boundary. Each QueryStage is a sub-tree that runs in a single Spark stage.
* A query stage is an individual sub-tree of a query plan, which can be executed ahead and provide
* accurate data statistics. For example, a sub-tree under shuffle/broadcast node is a query stage.
* Each query stage runs in a single Spark job/stage.
*/
abstract class QueryStage extends LeafExecNode {

Expand Down Expand Up @@ -65,6 +66,7 @@ abstract class QueryStage extends LeafExecNode {
override def executeToIterator(): Iterator[InternalRow] = plan.executeToIterator()
override def doExecute(): RDD[InternalRow] = plan.execute()
override def doExecuteBroadcast[T](): Broadcast[T] = plan.executeBroadcast()
override def doCanonicalize(): SparkPlan = plan.canonicalized

// TODO: maybe we should not hide QueryStage entirely from explain result.
override def generateTreeString(
Expand All @@ -86,7 +88,7 @@ abstract class QueryStage extends LeafExecNode {
case class ResultQueryStage(id: Int, plan: SparkPlan) extends QueryStage {

override def materialize(): Future[Any] = {
Future.unit
throw new IllegalStateException("Cannot materialize ResultQueryStage.")
}

override def withNewPlan(newPlan: SparkPlan): QueryStage = {
Expand All @@ -95,7 +97,7 @@ case class ResultQueryStage(id: Int, plan: SparkPlan) extends QueryStage {
}

/**
* A shuffle QueryStage whose child is a ShuffleExchangeExec.
* A shuffle QueryStage whose child is a [[ShuffleExchangeExec]].
*/
case class ShuffleQueryStage(id: Int, plan: ShuffleExchangeExec) extends QueryStage {

Expand All @@ -119,7 +121,7 @@ case class ShuffleQueryStage(id: Int, plan: ShuffleExchangeExec) extends QuerySt
}

/**
* A broadcast QueryStage whose child is a BroadcastExchangeExec.
* A broadcast QueryStage whose child is a [[BroadcastExchangeExec]].
*/
case class BroadcastQueryStage(id: Int, plan: BroadcastExchangeExec) extends QueryStage {

Expand Down
Loading