From 1721d998d70c110c298f865f8c4369288b225b96 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 14 Mar 2019 01:17:43 +0800 Subject: [PATCH] improve --- .../spark/sql/execution/SparkPlanInfo.scala | 6 +- .../adaptive/AdaptiveSparkPlanExec.scala | 60 ++-- .../adaptive/QueryFragmentCreator.scala | 260 ------------------ ...ragmentExec.scala => QueryStageExec.scala} | 63 ++--- .../adaptive/QueryStageManager.scala | 220 +++++++++++++++ ...la => AssertChildStagesMaterialized.scala} | 12 +- .../rule/ReduceNumShufflePartitions.scala | 41 +-- .../rule/RemoveRedundantShuffles.scala | 39 +++ .../ReduceNumShufflePartitionsSuite.scala | 25 +- 9 files changed, 351 insertions(+), 375 deletions(-) delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryFragmentCreator.scala rename sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/{QueryFragmentExec.scala => QueryStageExec.scala} (67%) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageManager.scala rename sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/rule/{AssertChildFragmentsMaterialized.scala => AssertChildStagesMaterialized.scala} (70%) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/rule/RemoveRedundantShuffles.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala index 0fc1358ec252..21ef289fa18f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, QueryFragmentExec} +import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, QueryStageExec} import org.apache.spark.sql.execution.exchange.ReusedExchangeExec import org.apache.spark.sql.execution.metric.SQLMetricInfo import org.apache.spark.sql.internal.SQLConf @@ -53,8 +53,8 @@ private[execution] object SparkPlanInfo { def fromSparkPlan(plan: SparkPlan): SparkPlanInfo = { val children = plan match { case ReusedExchangeExec(_, child) => child :: Nil - case a: AdaptiveSparkPlanExec => a.finalPlan.plan :: Nil - case fragment: QueryFragmentExec => fragment.plan :: Nil + case a: AdaptiveSparkPlanExec => a.finalPlan :: Nil + case fragment: QueryStageExec => fragment.plan :: Nil case _ => plan.children ++ plan.subqueries } val metrics = plan.metrics.toSeq.map { case (key, metric) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index 7da258842286..ae6290782fe3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -28,9 +28,11 @@ import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan, SparkPlanInfo, S import org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate /** - * A root node to execute the query plan adaptively. It creates query fragments, and incrementally - * updates the query plan when a query fragment is materialized and provides accurate runtime - * data statistics. + * A root node to execute the query plan adaptively. It splits the query plan into independent + * stages and executes them in order according to their dependencies. The query stage + * materializes its output at the end. When one stage completes, the data statistics of its + * materialized output will be used to optimize the subsequent stages. + * This is called mid-query re-optimization in database literature. */ case class AdaptiveSparkPlanExec(initialPlan: SparkPlan, session: SparkSession) extends LeafExecNode{ @@ -40,34 +42,34 @@ case class AdaptiveSparkPlanExec(initialPlan: SparkPlan, session: SparkSession) @volatile private var currentPlan: SparkPlan = initialPlan @volatile private var error: Throwable = null - // We will release the lock when we finish planning query fragments, or we fail to do the - // planning. Getting `finalPlan` will be blocked until the lock is release. + // We will release the lock when all the query stages are completed, or we fail to + // optimize/execute query stages. Getting `finalPlan` will be blocked until the lock is release. // This is better than wait()/notify(), as we can easily check if the computation has completed, // by calling `readyLock.getCount()`. private val readyLock = new CountDownLatch(1) - private def createCallback(executionId: Option[Long]): QueryFragmentCreatorCallback = { - new QueryFragmentCreatorCallback { - override def onPlanUpdate(updatedPlan: SparkPlan): Unit = { - updateCurrentPlan(updatedPlan, executionId) - if (updatedPlan.isInstanceOf[ResultQueryFragmentExec]) readyLock.countDown() - } + private def createCallback(executionId: Option[Long]) = new QueryStageManagerCallback { + override def onPlanUpdate(updatedPlan: SparkPlan): Unit = { + updateCurrentPlan(updatedPlan, executionId) + } + + override def onFinalPlan(finalPlan: SparkPlan): Unit = { + updateCurrentPlan(finalPlan, executionId) + readyLock.countDown() + } - override def onFragmentMaterializingFailed( - fragment: QueryFragmentExec, - e: Throwable): Unit = { - error = new SparkException( - s""" - |Fail to materialize fragment ${fragment.id}: - |${fragment.plan.treeString} + override def onStageMaterializationFailed(stage: QueryStageExec, e: Throwable): Unit = { + error = new SparkException( + s""" + |Fail to materialize query stage ${stage.id}: + |${stage.plan.treeString} """.stripMargin, e) - readyLock.countDown() - } + readyLock.countDown() + } - override def onError(e: Throwable): Unit = { - error = e - readyLock.countDown() - } + override def onError(e: Throwable): Unit = { + error = e + readyLock.countDown() } } @@ -81,18 +83,18 @@ case class AdaptiveSparkPlanExec(initialPlan: SparkPlan, session: SparkSession) } } - def finalPlan: ResultQueryFragmentExec = { + def finalPlan: SparkPlan = { if (readyLock.getCount > 0) { val sc = session.sparkContext val executionId = Option(sc.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)).map(_.toLong) - val creator = new QueryFragmentCreator(initialPlan, session, createCallback(executionId)) - creator.start() + val stageManager = new QueryStageManager(initialPlan, session, createCallback(executionId)) + stageManager.start() readyLock.await() - creator.stop() + stageManager.stop() } if (error != null) throw error - currentPlan.asInstanceOf[ResultQueryFragmentExec] + currentPlan } override def executeCollect(): Array[InternalRow] = finalPlan.executeCollect() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryFragmentCreator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryFragmentCreator.scala deleted file mode 100644 index 49f7a271872b..000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryFragmentCreator.scala +++ /dev/null @@ -1,260 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.adaptive - -import scala.collection.mutable -import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService} - -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning -import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.{CollapseCodegenStages, SparkPlan} -import org.apache.spark.sql.execution.adaptive.rule.{AssertChildFragmentsMaterialized, ReduceNumShufflePartitions} -import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, Exchange, ShuffleExchangeExec} -import org.apache.spark.sql.types.StructType -import org.apache.spark.util.{EventLoop, ThreadUtils} - -/** - * This class dynamically creates [[QueryFragmentExec]] bottom-up, optimize the query plan of query - * fragments and materialize them. It creates as many query fragments as possible at the same time, - * and materialize a query fragment when all its child fragments are materialized. - * - * To create query fragments, we traverse the query tree bottom up. When we hit an exchange node, - * and all the child query fragments of this exchange node are materialized, we try to create a new - * query fragment for this exchange node. - * - * To create a new query fragment, we first optimize the sub-tree of the exchange. After - * optimization, we check the output partitioning of the optimized sub-tree, and see if the - * exchange node is still necessary. - * - * If the exchange node becomes unnecessary, remove it and give up this query fragment creation, - * and continue to traverse the query plan tree until we hit the next exchange node. - * - * If the exchange node is still needed, create the query fragment and optimize its sub-tree again. - * It's necessary to have both the pre-creation optimization and post-creation optimization, because - * these 2 optimization have different assumptions. For pre-creation optimization, the shuffle node - * may be removed later on and the current sub-tree may be only a part of a query fragment, so we - * don't have the big picture of the query fragment yet. For post-creation optimization, the query - * fragment is created and we have the big picture of the query fragment. - * - * After the query fragment is optimized, we materialize it asynchronously, and continue to traverse - * the query plan tree to create more query fragments. - * - * When a query fragment completes materialization, we trigger the process of query fragments - * creation and traverse the query plan tree again. - */ -class QueryFragmentCreator( - initialPlan: SparkPlan, - session: SparkSession, - callback: QueryFragmentCreatorCallback) - extends EventLoop[QueryFragmentCreatorEvent]("QueryFragmentCreator") { - - private def conf = session.sessionState.conf - - private val readyFragments = mutable.HashSet.empty[Int] - - private var currentFragmentId = 0 - - private val fragmentCache = - mutable.HashMap.empty[StructType, mutable.Buffer[(Exchange, QueryFragmentExec)]] - - // The optimizer rules that will be applied to a sub-tree of the query plan before the fragment is - // created. Note that we may end up not creating the query fragment, so the rules here should not - // assume the given sub-plan-tree is the entire query plan of the query fragment. For example, if - // a rule want to collect all the child query fragments, it should not be put here. - private val preFragmentCreationOptimizerRules: Seq[Rule[SparkPlan]] = Seq( - AssertChildFragmentsMaterialized - ) - - // The optimizer rules that will be applied to a sub-tree of the query plan after the fragment is - // created. Note that once the fragment is created, we will not remove it anymore. If a rule - // changes the output partitioning of the sub-plan-tree, which may help to remove the exchange - // node, it's better to put it in `preFragmentCreationOptimizerRules`, so that we may create less - // query fragments. - private val postFragmentCreationOptimizerRules: Seq[Rule[SparkPlan]] = Seq( - ReduceNumShufflePartitions(conf), - CollapseCodegenStages(conf)) - - private var currentPlan = initialPlan - - private val localProperties = session.sparkContext.getLocalProperties - - private implicit def executionContext: ExecutionContextExecutorService = { - QueryFragmentCreator.executionContext - } - - override protected def onReceive(event: QueryFragmentCreatorEvent): Unit = event match { - case StartCreation => - // set active session and local properties for the event loop thread. - SparkSession.setActiveSession(session) - session.sparkContext.setLocalProperties(localProperties) - currentPlan = createQueryFragments(initialPlan) - - case MaterializeFragment(fragment) => - fragment.materialize().onComplete { res => - if (res.isSuccess) { - post(FragmentReady(fragment)) - } else { - callback.onFragmentMaterializingFailed(fragment, res.failed.get) - stop() - } - } - - case FragmentReady(fragment) => - if (fragment.isInstanceOf[ResultQueryFragmentExec]) { - callback.onPlanUpdate(fragment) - stop() - } else { - readyFragments += fragment.id - currentPlan = createQueryFragments(currentPlan) - } - } - - override protected def onStart(): Unit = { - post(StartCreation) - } - - private def preFragmentCreationOptimize(plan: SparkPlan): SparkPlan = { - preFragmentCreationOptimizerRules.foldLeft(plan) { - case (current, rule) => rule(current) - } - } - - private def postFragmentCreationOptimize(plan: SparkPlan): SparkPlan = { - postFragmentCreationOptimizerRules.foldLeft(plan) { - case (current, rule) => rule(current) - } - } - - /** - * Traverse the query plan bottom-up, and creates query fragments as many as possible. - */ - private def createQueryFragments(plan: SparkPlan): SparkPlan = { - val result = createQueryFragments0(plan) - if (result.allChildFragmentsReady) { - val finalPlan = postFragmentCreationOptimize(preFragmentCreationOptimize(result.newPlan)) - post(FragmentReady(ResultQueryFragmentExec(currentFragmentId, finalPlan))) - finalPlan - } else { - callback.onPlanUpdate(result.newPlan) - result.newPlan - } - } - - /** - * This method is called recursively to traverse the plan tree bottom-up. This method returns two - * information: 1) the new plan after we insert query fragments. 2) whether or not the child query - * fragments of the new plan are all ready. - * - * if the current plan is an exchange node, and all its child query fragments are ready, we try to - * create a new query fragment. - */ - private def createQueryFragments0(plan: SparkPlan): CreateFragmentResult = plan match { - case e: Exchange => - val similarFragments = fragmentCache.getOrElseUpdate(e.schema, mutable.Buffer.empty) - similarFragments.find(_._1.sameResult(e)) match { - case Some((_, existingFragment)) if conf.exchangeReuseEnabled => - CreateFragmentResult( - newPlan = ReusedQueryFragmentExec(existingFragment, e.output), - allChildFragmentsReady = readyFragments.contains(existingFragment.id)) - - case _ => - val result = createQueryFragments0(e.child) - // Try to create a query fragment only when all the child query fragments are ready. - if (result.allChildFragmentsReady) { - val optimizedPlan = preFragmentCreationOptimize(result.newPlan) - e match { - case s: ShuffleExchangeExec => - (s.desiredPartitioning, optimizedPlan.outputPartitioning) match { - case (desired: HashPartitioning, actual: HashPartitioning) - if desired.semanticEquals(actual) => - // This shuffle exchange is unnecessary now, remove it. The reason maybe: - // 1. the child plan has changed its output partitioning after optimization, - // and makes this exchange node unnecessary. - // 2. this exchange node is user specified, which turns out to be unnecessary. - CreateFragmentResult(newPlan = optimizedPlan, allChildFragmentsReady = true) - case _ => - val queryFragment = createQueryFragment(s.copy(child = optimizedPlan)) - similarFragments.append(e -> queryFragment) - // We've created a new fragment, which is obviously not ready yet. - CreateFragmentResult(newPlan = queryFragment, allChildFragmentsReady = false) - } - - case b: BroadcastExchangeExec => - val queryFragment = createQueryFragment(b.copy(child = optimizedPlan)) - similarFragments.append(e -> queryFragment) - // We've created a new fragment, which is obviously not ready yet. - CreateFragmentResult(newPlan = queryFragment, allChildFragmentsReady = false) - } - } else { - CreateFragmentResult( - newPlan = e.withNewChildren(Seq(result.newPlan)), - allChildFragmentsReady = false) - } - } - - case q: QueryFragmentExec => - CreateFragmentResult(newPlan = q, allChildFragmentsReady = readyFragments.contains(q.id)) - - case _ => - if (plan.children.isEmpty) { - CreateFragmentResult(newPlan = plan, allChildFragmentsReady = true) - } else { - val results = plan.children.map(createQueryFragments0) - CreateFragmentResult( - newPlan = plan.withNewChildren(results.map(_.newPlan)), - allChildFragmentsReady = results.forall(_.allChildFragmentsReady)) - } - } - - private def createQueryFragment(e: Exchange): QueryFragmentExec = { - val optimizedPlan = postFragmentCreationOptimize(e.child) - val queryFragment = e match { - case s: ShuffleExchangeExec => - ShuffleQueryFragmentExec(currentFragmentId, s.copy(child = optimizedPlan)) - case b: BroadcastExchangeExec => - BroadcastQueryFragmentExec(currentFragmentId, b.copy(child = optimizedPlan)) - } - currentFragmentId += 1 - post(MaterializeFragment(queryFragment)) - queryFragment - } - - override protected def onError(e: Throwable): Unit = callback.onError(e) -} - -case class CreateFragmentResult(newPlan: SparkPlan, allChildFragmentsReady: Boolean) - -object QueryFragmentCreator { - private val executionContext = ExecutionContext.fromExecutorService( - ThreadUtils.newDaemonCachedThreadPool("QueryFragmentCreator", 16)) -} - -trait QueryFragmentCreatorCallback { - def onPlanUpdate(updatedPlan: SparkPlan): Unit - def onFragmentMaterializingFailed(fragment: QueryFragmentExec, e: Throwable): Unit - def onError(e: Throwable): Unit -} - -sealed trait QueryFragmentCreatorEvent - -object StartCreation extends QueryFragmentCreatorEvent - -case class MaterializeFragment(fragment: QueryFragmentExec) extends QueryFragmentCreatorEvent - -case class FragmentReady(fragment: QueryFragmentExec) extends QueryFragmentCreatorEvent diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryFragmentExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala similarity index 67% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryFragmentExec.scala rename to sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala index cbad5710c469..80c623ea9440 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryFragmentExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala @@ -29,32 +29,38 @@ import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.exchange._ /** - * A query fragment is an individual sub-tree of a query plan, which can be executed ahead and - * provide accurate data statistics. For example, a sub-tree under shuffle/broadcast node is a - * query fragment. Each query fragment runs in a single Spark job/stage. + * A query stage is an independent subgraph of the query plan. Query stage materializes its output + * before proceeding with further operators of the query plan. The data statistics of the + * materialized output can be used to optimize subsequent query stages. + * + * There are 2 kinds of query stages: + * 1. Shuffle query stage. This stage materializes its output to shuffle files, and Spark launches + * another job to execute the further operators. + * 2. Broadcast stage. This stage materializes its output to an array in driver JVM. Spark + * broadcasts the array before executing the further operators. */ -abstract class QueryFragmentExec extends LeafExecNode { +abstract class QueryStageExec extends LeafExecNode { /** - * An id of this query fragment which is unique in the entire query plan. + * An id of this query stage which is unique in the entire query plan. */ def id: Int /** - * The sub-tree of the query plan that belongs to this query fragment. + * The sub-tree of the query plan that belongs to this query stage. */ def plan: SparkPlan /** - * Returns a new query fragment with a new plan, which is optimized based on accurate runtime + * Returns a new query stage with a new plan, which is optimized based on accurate runtime data * statistics. */ - def withNewPlan(newPlan: SparkPlan): QueryFragmentExec + def withNewPlan(newPlan: SparkPlan): QueryStageExec /** - * Materialize this QueryFragment, to prepare for the execution, like submitting map stages, + * Materialize this query stage, to prepare for the execution, like submitting map stages, * broadcasting data, etc. The caller side can use the returned [[Future]] to wait until this - * fragment is ready. + * stage is ready. */ def materialize(): Future[Any] @@ -68,7 +74,7 @@ abstract class QueryFragmentExec extends LeafExecNode { override def doExecuteBroadcast[T](): Broadcast[T] = plan.executeBroadcast() override def doCanonicalize(): SparkPlan = plan.canonicalized - // TODO: maybe we should not hide QueryFragment entirely from explain result. + // TODO: maybe we should not hide query stage entirely from explain result. override def generateTreeString( depth: Int, lastChildren: Seq[Boolean], @@ -83,25 +89,11 @@ abstract class QueryFragmentExec extends LeafExecNode { } /** - * The last QueryFragment of an execution plan. - */ -case class ResultQueryFragmentExec(id: Int, plan: SparkPlan) extends QueryFragmentExec { - - override def materialize(): Future[Any] = { - throw new IllegalStateException("Cannot materialize ResultQueryFragment.") - } - - override def withNewPlan(newPlan: SparkPlan): QueryFragmentExec = { - copy(plan = newPlan) - } -} - -/** - * A shuffle QueryFragment whose child is a [[ShuffleExchangeExec]]. + * A shuffle query stage whose child is a [[ShuffleExchangeExec]]. */ -case class ShuffleQueryFragmentExec(id: Int, plan: ShuffleExchangeExec) extends QueryFragmentExec { +case class ShuffleQueryStageExec(id: Int, plan: ShuffleExchangeExec) extends QueryStageExec { - override def withNewPlan(newPlan: SparkPlan): QueryFragmentExec = { + override def withNewPlan(newPlan: SparkPlan): QueryStageExec = { copy(plan = newPlan.asInstanceOf[ShuffleExchangeExec]) } @@ -121,12 +113,11 @@ case class ShuffleQueryFragmentExec(id: Int, plan: ShuffleExchangeExec) extends } /** - * A broadcast QueryFragment whose child is a [[BroadcastExchangeExec]]. + * A broadcast query stage whose child is a [[BroadcastExchangeExec]]. */ -case class BroadcastQueryFragmentExec(id: Int, plan: BroadcastExchangeExec) - extends QueryFragmentExec { +case class BroadcastQueryStageExec(id: Int, plan: BroadcastExchangeExec) extends QueryStageExec { - override def withNewPlan(newPlan: SparkPlan): QueryFragmentExec = { + override def withNewPlan(newPlan: SparkPlan): QueryStageExec = { copy(plan = newPlan.asInstanceOf[BroadcastExchangeExec]) } @@ -136,10 +127,10 @@ case class BroadcastQueryFragmentExec(id: Int, plan: BroadcastExchangeExec) } /** - * A wrapper of QueryFragment to indicate that it's reused. Note that this is not a query fragment. + * A wrapper of query stage to indicate that it's reused. Note that itself is not a query stage. */ -case class ReusedQueryFragmentExec(child: QueryFragmentExec, output: Seq[Attribute]) - extends LeafExecNode { +case class ReusedQueryStageExec(child: SparkPlan, output: Seq[Attribute]) + extends UnaryExecNode { // Ignore this wrapper for canonicalizing. override def doCanonicalize(): SparkPlan = child.canonicalized @@ -152,7 +143,7 @@ case class ReusedQueryFragmentExec(child: QueryFragmentExec, output: Seq[Attribu child.executeBroadcast() } - // `ReusedQueryFragment` can have distinct set of output attribute ids from its child, we need + // `ReusedQueryStageExec` can have distinct set of output attribute ids from its child, we need // to update the attribute ids in `outputPartitioning` and `outputOrdering`. private lazy val updateAttr: Expression => Expression = { val originalAttrToNewAttr = AttributeMap(child.output.zip(output)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageManager.scala new file mode 100644 index 000000000000..49a602af0a71 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageManager.scala @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import scala.collection.mutable +import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService} + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.{CollapseCodegenStages, SparkPlan} +import org.apache.spark.sql.execution.adaptive.rule._ +import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, Exchange, ShuffleExchangeExec} +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.{EventLoop, ThreadUtils} + +/** + * This class inserts [[QueryStageExec]] into the query plan in a bottom-up fashion, and + * materializes the query stages asynchronously as soon as they are created. + * + * When one query stage finishes materialization, a list of adaptive optimizer rules will be + * executed, trying to optimize the query plan with the data statistics collected from the the + * materialized data. Then we travers the query plan again and try to insert more query stages. + * + * To create query stages, we traverse the query tree bottom up. When we hit an exchange node, + * and all the child query stages of this exchange node are materialized, we create a new + * query stage for this exchange node. + * + * Right before the stage creation, a list of query stage optimizer rules will be executed. These + * optimizer rules are different from the adaptive optimizer rules. Query stage optimizer rules only + * focus on a plan sub-tree of a specific query stage, and they will be executed only after all the + * child stages are materialized. + */ +class QueryStageManager( + initialPlan: SparkPlan, + session: SparkSession, + callback: QueryStageManagerCallback) + extends EventLoop[QueryStageManagerEvent]("QueryFragmentCreator") { + + private def conf = session.sessionState.conf + + private val readyStages = mutable.HashSet.empty[Int] + + private var currentStageId = 0 + + private val stageCache = + mutable.HashMap.empty[StructType, mutable.Buffer[(Exchange, QueryStageExec)]] + + private var currentPlan = initialPlan + + private val localProperties = session.sparkContext.getLocalProperties + + private implicit def executionContext: ExecutionContextExecutorService = { + QueryStageManager.executionContext + } + + // A list of optimizer rules that will be applied when a query stage finishes materialization. + // These rules need to travers the entire query plan, and find chances to optimize the query plan + // with the data statistics collected from materialized query stage's output. + private val adaptiveOptimizerRules: Seq[Rule[SparkPlan]] = Seq( + RemoveRedundantShuffles) + + // A list of optimizer rules that will be applied right before a query stage is created. + // These rules need to traverse the plan sub-tree of the query stage to be created, and find + // chances to optimize this query stage given the all its child query stages. + private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq( + AssertChildStagesMaterialized, + ReduceNumShufflePartitions(conf), + CollapseCodegenStages(conf)) + + private def optimizeEntirePlan(plan: SparkPlan): SparkPlan = { + adaptiveOptimizerRules.foldLeft(plan) { + case (current, rule) => rule(current) + } + } + + private def optimizeQueryStage(plan: SparkPlan): SparkPlan = { + queryStageOptimizerRules.foldLeft(plan) { + case (current, rule) => rule(current) + } + } + + override protected def onReceive(event: QueryStageManagerEvent): Unit = event match { + case Start => + // set active session and local properties for the event loop thread. + SparkSession.setActiveSession(session) + session.sparkContext.setLocalProperties(localProperties) + currentPlan = createQueryStages(initialPlan) + + case MaterializeStage(stage) => + stage.materialize().onComplete { res => + if (res.isSuccess) { + post(StageReady(stage)) + } else { + callback.onStageMaterializationFailed(stage, res.failed.get) + stop() + } + } + + case StageReady(stage) => + readyStages += stage.id + currentPlan = optimizeEntirePlan(currentPlan) + currentPlan = createQueryStages(currentPlan) + } + + override protected def onStart(): Unit = { + post(Start) + } + + /** + * Traverse the query plan bottom-up, and creates query stages as many as possible. + */ + private def createQueryStages(plan: SparkPlan): SparkPlan = { + val result = createQueryStages0(plan) + if (result.allChildStagesReady) { + val finalPlan = optimizeQueryStage(result.newPlan) + callback.onFinalPlan(finalPlan) + finalPlan + } else { + callback.onPlanUpdate(result.newPlan) + result.newPlan + } + } + + /** + * This method is called recursively to traverse the plan tree bottom-up. This method returns two + * information: 1) the new plan after we insert query stages. 2) whether or not the child query + * stages of the new plan are all ready. + * + * if the current plan is an exchange node, and all its child query stages are ready, we create + * a new query stage. + */ + private def createQueryStages0(plan: SparkPlan): CreateStageResult = plan match { + case e: Exchange => + val similarStages = stageCache.getOrElseUpdate(e.schema, mutable.Buffer.empty) + similarStages.find(_._1.sameResult(e)) match { + case Some((_, existingStage)) if conf.exchangeReuseEnabled => + CreateStageResult( + newPlan = ReusedQueryStageExec(existingStage, e.output), + allChildStagesReady = readyStages.contains(existingStage.id)) + + case _ => + val result = createQueryStages0(e.child) + val newPlan = e.withNewChildren(Seq(result.newPlan)).asInstanceOf[Exchange] + // Create a query stage only when all the child query stages are ready. + if (result.allChildStagesReady) { + val queryStage = createQueryStage(newPlan) + similarStages.append(e -> queryStage) + // We've created a new stage, which is obviously not ready yet. + CreateStageResult(newPlan = queryStage, allChildStagesReady = false) + } else { + CreateStageResult(newPlan = newPlan, allChildStagesReady = false) + } + } + + case q: QueryStageExec => + CreateStageResult(newPlan = q, allChildStagesReady = readyStages.contains(q.id)) + + case _ => + if (plan.children.isEmpty) { + CreateStageResult(newPlan = plan, allChildStagesReady = true) + } else { + val results = plan.children.map(createQueryStages0) + CreateStageResult( + newPlan = plan.withNewChildren(results.map(_.newPlan)), + allChildStagesReady = results.forall(_.allChildStagesReady)) + } + } + + private def createQueryStage(e: Exchange): QueryStageExec = { + val optimizedPlan = optimizeQueryStage(e.child) + val queryStage = e match { + case s: ShuffleExchangeExec => + ShuffleQueryStageExec(currentStageId, s.copy(child = optimizedPlan)) + case b: BroadcastExchangeExec => + BroadcastQueryStageExec(currentStageId, b.copy(child = optimizedPlan)) + } + currentStageId += 1 + post(MaterializeStage(queryStage)) + queryStage + } + + override protected def onError(e: Throwable): Unit = callback.onError(e) +} + +case class CreateStageResult(newPlan: SparkPlan, allChildStagesReady: Boolean) + +object QueryStageManager { + private val executionContext = ExecutionContext.fromExecutorService( + ThreadUtils.newDaemonCachedThreadPool("QueryFragmentCreator", 16)) +} + +trait QueryStageManagerCallback { + def onPlanUpdate(updatedPlan: SparkPlan): Unit + def onFinalPlan(finalPlan: SparkPlan): Unit + def onStageMaterializationFailed(stage: QueryStageExec, e: Throwable): Unit + def onError(e: Throwable): Unit +} + +sealed trait QueryStageManagerEvent + +object Start extends QueryStageManagerEvent + +case class MaterializeStage(stage: QueryStageExec) extends QueryStageManagerEvent + +case class StageReady(stage: QueryStageExec) extends QueryStageManagerEvent diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/rule/AssertChildFragmentsMaterialized.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/rule/AssertChildStagesMaterialized.scala similarity index 70% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/rule/AssertChildFragmentsMaterialized.scala rename to sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/rule/AssertChildStagesMaterialized.scala index 3d1d014cf4b8..f522c12dd663 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/rule/AssertChildFragmentsMaterialized.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/rule/AssertChildStagesMaterialized.scala @@ -19,14 +19,14 @@ package org.apache.spark.sql.execution.adaptive.rule import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.execution.adaptive.QueryFragmentExec +import org.apache.spark.sql.execution.adaptive.QueryStageExec -// A sanity check rule to make sure we are running query fragment optimizer rules on a sub-tree of -// query plan with all input fragments materialized. -object AssertChildFragmentsMaterialized extends Rule[SparkPlan] { +// A sanity check rule to make sure we are running query stage optimizer rules on a sub-tree of +// query plan with all child query stages materialized. +object AssertChildStagesMaterialized extends Rule[SparkPlan] { override def apply(plan: SparkPlan): SparkPlan = plan.transform { - case q: QueryFragmentExec if !q.materialize().isCompleted => + case q: QueryStageExec if !q.materialize().isCompleted => throw new IllegalArgumentException( - s"The input fragments should all be materialized, but the below one is not.\n ${q.plan}") + s"The input query stages should all be materialized, but the below one is not.\n ${q.plan}") } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/rule/ReduceNumShufflePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/rule/ReduceNumShufflePartitions.scala index 31edd21ff3f7..9a372bbb6a8b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/rule/ReduceNumShufflePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/rule/ReduceNumShufflePartitions.scala @@ -21,14 +21,13 @@ import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration.Duration import org.apache.spark.MapOutputStatistics -import org.apache.spark.SparkException import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{ShuffledRowRDD, SparkPlan, UnaryExecNode} -import org.apache.spark.sql.execution.adaptive.{QueryFragmentExec, ReusedQueryFragmentExec, ShuffleQueryFragmentExec} +import org.apache.spark.sql.execution.adaptive.{QueryStageExec, ShuffleQueryStageExec} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.ThreadUtils @@ -56,18 +55,19 @@ case class ReduceNumShufflePartitions(conf: SQLConf) extends Rule[SparkPlan] { override def apply(plan: SparkPlan): SparkPlan = { val shuffleMetrics: Seq[MapOutputStatistics] = plan.collect { - case fragment: ShuffleQueryFragmentExec => + case fragment: ShuffleQueryStageExec => val metricsFuture = fragment.mapOutputStatisticsFuture assert(metricsFuture.isCompleted, "ShuffleQueryFragment should already be ready") ThreadUtils.awaitResult(metricsFuture, Duration.Zero) } - val allFragmentLeaves = plan.collectLeaves().forall { node => - node.isInstanceOf[QueryFragmentExec] || node.isInstanceOf[ReusedQueryFragmentExec] - } - - if (allFragmentLeaves) { - // ShuffleQueryFragment gives null mapOutputStatistics when the input RDD has 0 partitions, + if (!plan.collectLeaves().forall(_.isInstanceOf[QueryStageExec])) { + // If not all leaf nodes are query stages, it's not safe to reduce the number of + // shuffle partitions, because we may break the assumption that all children of a spark plan + // have same number of output partitions. + plan + } else { + // `ShuffleQueryStageExec` gives null mapOutputStatistics when the input RDD has 0 partitions, // we should skip it when calculating the `partitionStartIndices`. val validMetrics = shuffleMetrics.filter(_ != null) if (validMetrics.nonEmpty) { @@ -75,21 +75,14 @@ case class ReduceNumShufflePartitions(conf: SQLConf) extends Rule[SparkPlan] { // This transformation adds new nodes, so we must use `transformUp` here. plan.transformUp { // even for shuffle exchange whose input RDD has 0 partition, we should still update its - // `partitionStartIndices`, so that all the leaf shuffles in a fragment have the same + // `partitionStartIndices`, so that all the leaf shuffles in a stage have the same // number of output partitions. - case fragment: ShuffleQueryFragmentExec => - CoalescedShuffleReaderExec(fragment, partitionStartIndices) - case r@ReusedQueryFragmentExec(fragment: ShuffleQueryFragmentExec, output) => - CoalescedShuffleReaderExec(r, partitionStartIndices) + case stage: ShuffleQueryStageExec => + CoalescedShuffleReaderExec(stage, partitionStartIndices) } } else { plan } - } else { - // If not all leaf nodes are query fragments, it's not safe to reduce the number of - // shuffle partitions, because we may break the assumption that all children of a spark plan - // have same number of output partitions. - plan } } @@ -169,7 +162,7 @@ case class ReduceNumShufflePartitions(conf: SQLConf) extends Rule[SparkPlan] { } case class CoalescedShuffleReaderExec( - child: SparkPlan, + child: ShuffleQueryStageExec, partitionStartIndices: Array[Int]) extends UnaryExecNode { override def output: Seq[Attribute] = child.output @@ -182,13 +175,7 @@ case class CoalescedShuffleReaderExec( override protected def doExecute(): RDD[InternalRow] = { if (cachedShuffleRDD == null) { - cachedShuffleRDD = child match { - case fragment: ShuffleQueryFragmentExec => - fragment.plan.createShuffledRDD(Some(partitionStartIndices)) - case ReusedQueryFragmentExec(fragment: ShuffleQueryFragmentExec, _) => - fragment.plan.createShuffledRDD(Some(partitionStartIndices)) - case _ => throw new SparkException("Invalid child for CoalescedShuffleReaderExec") - } + cachedShuffleRDD = child.plan.createShuffledRDD(Some(partitionStartIndices)) } cachedShuffleRDD } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/rule/RemoveRedundantShuffles.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/rule/RemoveRedundantShuffles.scala new file mode 100644 index 000000000000..1112869399d5 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/rule/RemoveRedundantShuffles.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive.rule + +import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec + +/** + * Remove shuffle nodes if the child's output partitions is already the desired partitioning. + * + * This should be the last rule of adaptive optimizer rules, as other rules may change plan + * node's output partitioning and make some shuffle nodes become unnecessary. + */ +object RemoveRedundantShuffles extends Rule[SparkPlan] { + override def apply(plan: SparkPlan): SparkPlan = plan.transformUp { + case shuffle @ ShuffleExchangeExec(upper: HashPartitioning, child) => + child.outputPartitioning match { + case lower: HashPartitioning if upper.semanticEquals(lower) => child + case _ => shuffle + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ReduceNumShufflePartitionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReduceNumShufflePartitionsSuite.scala index 688e0d06cdb8..83deae2342a3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ReduceNumShufflePartitionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReduceNumShufflePartitionsSuite.scala @@ -315,7 +315,7 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA // Then, let's look at the number of post-shuffle partitions estimated // by the ExchangeCoordinator. val finalPlan = agg.queryExecution.executedPlan - .asInstanceOf[AdaptiveSparkPlanExec].finalPlan.plan + .asInstanceOf[AdaptiveSparkPlanExec].finalPlan val shuffleReaders = finalPlan.collect { case reader: CoalescedShuffleReaderExec => reader } @@ -362,7 +362,7 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA // Then, let's look at the number of post-shuffle partitions estimated // by the ExchangeCoordinator. val finalPlan = join.queryExecution.executedPlan - .asInstanceOf[AdaptiveSparkPlanExec].finalPlan.plan + .asInstanceOf[AdaptiveSparkPlanExec].finalPlan val shuffleReaders = finalPlan.collect { case reader: CoalescedShuffleReaderExec => reader } @@ -414,7 +414,7 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA // Then, let's look at the number of post-shuffle partitions estimated // by the ExchangeCoordinator. val finalPlan = join.queryExecution.executedPlan - .asInstanceOf[AdaptiveSparkPlanExec].finalPlan.plan + .asInstanceOf[AdaptiveSparkPlanExec].finalPlan val shuffleReaders = finalPlan.collect { case reader: CoalescedShuffleReaderExec => reader } @@ -466,7 +466,7 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA // Then, let's look at the number of post-shuffle partitions estimated // by the ExchangeCoordinator. val finalPlan = join.queryExecution.executedPlan - .asInstanceOf[AdaptiveSparkPlanExec].finalPlan.plan + .asInstanceOf[AdaptiveSparkPlanExec].finalPlan val shuffleReaders = finalPlan.collect { case reader: CoalescedShuffleReaderExec => reader } @@ -509,7 +509,7 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA // Then, let's make sure we do not reduce number of ppst shuffle partitions. val finalPlan = join.queryExecution.executedPlan - .asInstanceOf[AdaptiveSparkPlanExec].finalPlan.plan + .asInstanceOf[AdaptiveSparkPlanExec].finalPlan val shuffleReaders = finalPlan.collect { case reader: CoalescedShuffleReaderExec => reader } @@ -534,8 +534,8 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA // ReusedQueryFragment 0 val resultDf = df.join(df, "key").join(df, "key") val finalPlan = resultDf.queryExecution.executedPlan - .asInstanceOf[AdaptiveSparkPlanExec].finalPlan.plan - assert(finalPlan.collect { case p: ReusedQueryFragmentExec => p }.length == 2) + .asInstanceOf[AdaptiveSparkPlanExec].finalPlan + assert(finalPlan.collect { case p: ReusedQueryStageExec => p }.length == 2) assert(finalPlan.collect { case p: CoalescedShuffleReaderExec => p }.length == 3) checkAnswer(resultDf, Row(0, 0, 0, 0) :: Nil) @@ -549,26 +549,23 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA val resultDf2 = grouped.groupBy(col("key") + 1).max("value") .union(grouped.groupBy(col("key") + 2).max("value")) - val resultFragment = resultDf2.queryExecution.executedPlan + val finalPlan2 = resultDf2.queryExecution.executedPlan .asInstanceOf[AdaptiveSparkPlanExec].finalPlan // The result fragment has 2 children - val level1Fragments = resultFragment.plan.collect { case q: QueryFragmentExec => q } + val level1Fragments = finalPlan2.collect { case q: QueryStageExec => q } assert(level1Fragments.length == 2) val leafFragments = level1Fragments.flatMap { fragment => // All of the child fragments of result fragment have only one child fragment. - val children = fragment.plan.collect { - case q: QueryFragmentExec => q - case r: ReusedQueryFragmentExec => r.child - } + val children = fragment.plan.collect { case q: QueryStageExec => q } assert(children.length == 1) children } assert(leafFragments.length == 2) val reusedFragments = level1Fragments.flatMap { fragment => - fragment.plan.collect { case r: ReusedQueryFragmentExec => r } + fragment.plan.collect { case r: ReusedQueryStageExec => r } } assert(reusedFragments.length == 1)