Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,3 @@ package org.apache.spark
* (may be inexact due to use of compressed map statuses)
*/
private[spark] class MapOutputStatistics(val shuffleId: Int, val bytesByPartitionId: Array[Long])
extends Serializable
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,12 @@ class QueryExecution(
* row format conversions as needed.
*/
protected def prepareForExecution(plan: SparkPlan): SparkPlan = {
if (sparkSession.sessionState.conf.adaptiveExecutionEnabled) {
adaptivePreparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp)}
val rules = if (sparkSession.sessionState.conf.adaptiveExecutionEnabled) {
adaptivePreparations
} else {
preparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp)}
preparations
}
rules.foldLeft(plan) { case (sp, rule) => rule.apply(sp)}
}

/** A sequence of rules that will be applied in order to the physical plan before execution. */
Expand All @@ -109,14 +110,16 @@ class QueryExecution(
ReuseExchange(sparkSession.sessionState.conf),
ReuseSubquery(sparkSession.sessionState.conf))

// With adaptive execution, whole stage codegen will be done inside `QueryStageExecutor`.
protected def adaptivePreparations: Seq[Rule[SparkPlan]] = Seq(
PlanSubqueries(sparkSession),
EnsureRequirements(sparkSession.sessionState.conf),
ReuseExchange(sparkSession.sessionState.conf),
ReuseSubquery(sparkSession.sessionState.conf),
// PlanQueryStage needs to be the last rule because it divides the plan into multiple sub-trees
// by inserting leaf node QueryStageInput. Transforming the plan after applying this rule will
// by inserting leaf node QueryStage. Transforming the plan after applying this rule will
// only transform node in a sub-tree.
PlanQueryStage(sparkSession.sessionState.conf))
PlanQueryStage(sparkSession))

def simpleString: String = withRedaction {
val concat = new StringConcat()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.execution

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.execution.adaptive.QueryStageInput
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlan, QueryStage}
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.execution.metric.SQLMetricInfo
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -53,7 +53,8 @@ private[execution] object SparkPlanInfo {
def fromSparkPlan(plan: SparkPlan): SparkPlanInfo = {
val children = plan match {
case ReusedExchangeExec(_, child) => child :: Nil
case i: QueryStageInput => i.childStage :: Nil
case a: AdaptiveSparkPlan => a.resultStage.plan :: Nil
case stage: QueryStage => stage.plan :: Nil
case _ => plan.children ++ plan.subqueries
}
val metrics = plan.metrics.toSeq.map { case (key, metric) =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.adaptive

import java.util.concurrent.CountDownLatch

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan, SparkPlanInfo, SQLExecution}
import org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate

/**
* A root node to trigger query stages and execute the query plan adaptively. It incrementally
* updates the query plan when a query stage is materialized and provides accurate runtime
* statistics.
*/
case class AdaptiveSparkPlan(initialPlan: ResultQueryStage, session: SparkSession)
extends LeafExecNode{
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need special consideration in SparkPlanInfo.fromSparkPlan?

  def fromSparkPlan(plan: SparkPlan): SparkPlanInfo = {
    val children = plan match {
      case ReusedExchangeExec(_, child) => child :: Nil
      case stage: QueryStage => stage.plan :: Nil
      case adaptive: AdaptiveSparkPlan => adaptive.plan :: Nil
      case _ => plan.children ++ plan.subqueries
    }

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes we need, good catch!


override def output: Seq[Attribute] = initialPlan.output

@volatile private var currentQueryStage: QueryStage = initialPlan
@volatile private var error: Throwable = null
private val readyLock = new CountDownLatch(1)

private def replaceStage(oldStage: QueryStage, newStage: QueryStage): QueryStage = {
if (oldStage.id == newStage.id) {
newStage
} else {
val newPlanForOldStage = oldStage.plan.transform {
case q: QueryStage => replaceStage(q, newStage)
}
oldStage.withNewPlan(newPlanForOldStage)
}
}

private def createCallback(executionId: Option[Long]): QueryStageTriggerCallback = {
new QueryStageTriggerCallback {
override def onStageUpdated(stage: QueryStage): Unit = {
updateCurrentQueryStage(stage, executionId)
if (stage.isInstanceOf[ResultQueryStage]) readyLock.countDown()
}

override def onStagePlanningFailed(stage: QueryStage, e: Throwable): Unit = {
error = new RuntimeException(
s"""
|Fail to plan stage ${stage.id}:
|${stage.plan.treeString}
""".stripMargin, e)
readyLock.countDown()
}

override def onStageMaterializingFailed(stage: QueryStage, e: Throwable): Unit = {
error = new RuntimeException(
s"""
|Fail to materialize stage ${stage.id}:
|${stage.plan.treeString}
""".stripMargin, e)
readyLock.countDown()
}

override def onError(e: Throwable): Unit = {
error = e
readyLock.countDown()
}
}
}

private def updateCurrentQueryStage(newStage: QueryStage, executionId: Option[Long]): Unit = {
currentQueryStage = replaceStage(currentQueryStage, newStage)
executionId.foreach { id =>
session.sparkContext.listenerBus.post(SparkListenerSQLAdaptiveExecutionUpdate(
id,
SQLExecution.getQueryExecution(id).toString,
SparkPlanInfo.fromSparkPlan(currentQueryStage)))
}
}

def resultStage: ResultQueryStage = {
if (readyLock.getCount > 0) {
val sc = session.sparkContext
val executionId = Option(sc.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)).map(_.toLong)
val trigger = new QueryStageTrigger(session, createCallback(executionId))
trigger.start()
trigger.trigger(initialPlan)
readyLock.await()
trigger.stop()
}

if (error != null) throw error
currentQueryStage.asInstanceOf[ResultQueryStage]
}

override def executeCollect(): Array[InternalRow] = resultStage.executeCollect()
override def executeTake(n: Int): Array[InternalRow] = resultStage.executeTake(n)
override def executeToIterator(): Iterator[InternalRow] = resultStage.executeToIterator()
override def doExecute(): RDD[InternalRow] = resultStage.execute()
override def generateTreeString(
depth: Int,
lastChildren: Seq[Boolean],
append: String => Unit,
verbose: Boolean,
prefix: String = "",
addSuffix: Boolean = false,
maxFields: Int): Unit = {
currentQueryStage.generateTreeString(
depth, lastChildren, append, verbose, "", false, maxFields)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,65 +17,41 @@

package org.apache.spark.sql.execution.adaptive

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, Exchange, ShuffleExchangeExec}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, Exchange, ReusedExchangeExec, ShuffleExchangeExec}

/**
* Divide the spark plan into multiple QueryStages. For each Exchange in the plan, it adds a
* QueryStage and a QueryStageInput. If reusing Exchange is enabled, it finds duplicated exchanges
* and uses the same QueryStage for all the references. Note this rule must be run after
* EnsureRequirements rule. The rule divides the plan into multiple sub-trees as QueryStageInput
* is a leaf node. Transforming the plan after applying this rule will only transform node in a
* sub-tree.
* Divide the spark plan into multiple QueryStages. For each Exchange in the plan, it wraps it with
* a [[QueryStage]]. At the end it adds an [[AdaptiveSparkPlan]] at the top, which will drive the
* execution of query stages.
*/
case class PlanQueryStage(conf: SQLConf) extends Rule[SparkPlan] {
case class PlanQueryStage(session: SparkSession) extends Rule[SparkPlan] {

def apply(plan: SparkPlan): SparkPlan = {

val newPlan = if (!conf.exchangeReuseEnabled) {
plan.transformUp {
case e: ShuffleExchangeExec =>
ShuffleQueryStageInput(ShuffleQueryStage(e), e.output)
case e: BroadcastExchangeExec =>
BroadcastQueryStageInput(BroadcastQueryStage(e), e.output)
}
} else {
// Build a hash map using schema of exchanges to avoid O(N*N) sameResult calls.
val stages = mutable.HashMap[StructType, ArrayBuffer[QueryStage]]()

plan.transformUp {
case exchange: Exchange =>
val sameSchema = stages.getOrElseUpdate(exchange.schema, ArrayBuffer[QueryStage]())
val samePlan = sameSchema.find { s =>
exchange.sameResult(s.child)
}
if (samePlan.isDefined) {
// Keep the output of this exchange, the following plans require that to resolve
// attributes.
exchange match {
case e: ShuffleExchangeExec => ShuffleQueryStageInput(
samePlan.get.asInstanceOf[ShuffleQueryStage], exchange.output)
case e: BroadcastExchangeExec => BroadcastQueryStageInput(
samePlan.get.asInstanceOf[BroadcastQueryStage], exchange.output)
}
} else {
val queryStageInput = exchange match {
case e: ShuffleExchangeExec =>
ShuffleQueryStageInput(ShuffleQueryStage(e), e.output)
case e: BroadcastExchangeExec =>
BroadcastQueryStageInput(BroadcastQueryStage(e), e.output)
}
sameSchema += queryStageInput.childStage
queryStageInput
}
}
var id = 0
val exchangeToQueryStage = new java.util.IdentityHashMap[Exchange, QueryStage]
val planWithStages = plan.transformUp {
case e: ShuffleExchangeExec =>
val queryStage = ShuffleQueryStage(id, e)
id += 1
exchangeToQueryStage.put(e, queryStage)
queryStage
case e: BroadcastExchangeExec =>
val queryStage = BroadcastQueryStage(id, e)
id += 1
exchangeToQueryStage.put(e, queryStage)
queryStage
// The `ReusedExchangeExec` was added in the rule `ReuseExchange`, via transforming up the
// query plan. This rule also transform up the query plan, so when we hit `ReusedExchangeExec`
// here, the exchange being reused must already be hit before and there should be an entry
// for it in `exchangeToQueryStage`.
case e: ReusedExchangeExec =>
val existingQueryStage = exchangeToQueryStage.get(e.child)
assert(existingQueryStage != null, "The exchange being reused should be hit before.")
ReusedQueryStage(existingQueryStage, e.output)
}
ResultQueryStage(newPlan)
AdaptiveSparkPlan(ResultQueryStage(id, planWithStages), session)
}
}
Loading