Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.execution

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, QueryFragmentExec}
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, QueryStageExec}
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.execution.metric.SQLMetricInfo
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -53,8 +53,8 @@ private[execution] object SparkPlanInfo {
def fromSparkPlan(plan: SparkPlan): SparkPlanInfo = {
val children = plan match {
case ReusedExchangeExec(_, child) => child :: Nil
case a: AdaptiveSparkPlanExec => a.finalPlan.plan :: Nil
case fragment: QueryFragmentExec => fragment.plan :: Nil
case a: AdaptiveSparkPlanExec => a.finalPlan :: Nil
case fragment: QueryStageExec => fragment.plan :: Nil
case _ => plan.children ++ plan.subqueries
}
val metrics = plan.metrics.toSeq.map { case (key, metric) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan, SparkPlanInfo, S
import org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate

/**
* A root node to execute the query plan adaptively. It creates query fragments, and incrementally
* updates the query plan when a query fragment is materialized and provides accurate runtime
* data statistics.
* A root node to execute the query plan adaptively. It splits the query plan into independent
* stages and executes them in order according to their dependencies. The query stage
* materializes its output at the end. When one stage completes, the data statistics of its
* materialized output will be used to optimize the subsequent stages.
* This is called mid-query re-optimization in database literature.
*/
case class AdaptiveSparkPlanExec(initialPlan: SparkPlan, session: SparkSession)
extends LeafExecNode{
Expand All @@ -40,34 +42,34 @@ case class AdaptiveSparkPlanExec(initialPlan: SparkPlan, session: SparkSession)
@volatile private var currentPlan: SparkPlan = initialPlan
@volatile private var error: Throwable = null

// We will release the lock when we finish planning query fragments, or we fail to do the
// planning. Getting `finalPlan` will be blocked until the lock is release.
// We will release the lock when all the query stages are completed, or we fail to
// optimize/execute query stages. Getting `finalPlan` will be blocked until the lock is release.
// This is better than wait()/notify(), as we can easily check if the computation has completed,
// by calling `readyLock.getCount()`.
private val readyLock = new CountDownLatch(1)

private def createCallback(executionId: Option[Long]): QueryFragmentCreatorCallback = {
new QueryFragmentCreatorCallback {
override def onPlanUpdate(updatedPlan: SparkPlan): Unit = {
updateCurrentPlan(updatedPlan, executionId)
if (updatedPlan.isInstanceOf[ResultQueryFragmentExec]) readyLock.countDown()
}
private def createCallback(executionId: Option[Long]) = new QueryStageManagerCallback {
override def onPlanUpdate(updatedPlan: SparkPlan): Unit = {
updateCurrentPlan(updatedPlan, executionId)
}

override def onFinalPlan(finalPlan: SparkPlan): Unit = {
updateCurrentPlan(finalPlan, executionId)
readyLock.countDown()
}

override def onFragmentMaterializingFailed(
fragment: QueryFragmentExec,
e: Throwable): Unit = {
error = new SparkException(
s"""
|Fail to materialize fragment ${fragment.id}:
|${fragment.plan.treeString}
override def onStageMaterializationFailed(stage: QueryStageExec, e: Throwable): Unit = {
error = new SparkException(
s"""
|Fail to materialize query stage ${stage.id}:
|${stage.plan.treeString}
""".stripMargin, e)
readyLock.countDown()
}
readyLock.countDown()
}

override def onError(e: Throwable): Unit = {
error = e
readyLock.countDown()
}
override def onError(e: Throwable): Unit = {
error = e
readyLock.countDown()
}
}

Expand All @@ -81,18 +83,18 @@ case class AdaptiveSparkPlanExec(initialPlan: SparkPlan, session: SparkSession)
}
}

def finalPlan: ResultQueryFragmentExec = {
def finalPlan: SparkPlan = {
if (readyLock.getCount > 0) {
val sc = session.sparkContext
val executionId = Option(sc.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)).map(_.toLong)
val creator = new QueryFragmentCreator(initialPlan, session, createCallback(executionId))
creator.start()
val stageManager = new QueryStageManager(initialPlan, session, createCallback(executionId))
stageManager.start()
readyLock.await()
creator.stop()
stageManager.stop()
}

if (error != null) throw error
currentPlan.asInstanceOf[ResultQueryFragmentExec]
currentPlan
}

override def executeCollect(): Array[InternalRow] = finalPlan.executeCollect()
Expand Down

This file was deleted.

Loading