From 5dacbe3fb574aafef2a61d35f564b92f9428b1df Mon Sep 17 00:00:00 2001 From: erenavsarogullari Date: Sun, 20 Nov 2022 22:33:01 -0800 Subject: [PATCH] SPARK-41214 - SubPlan metrics under InMemoryRelation are missed when AQE Cached DataFrame Support is enabled --- .../execution/adaptive/AdaptiveSparkPlanExec.scala | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index 62a75e753455..4bedbfb81deb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -144,6 +144,9 @@ case class AdaptiveSparkPlanExec( @transient private val collapseCodegenStagesRule: Rule[SparkPlan] = CollapseCodegenStages() + private val isAQECachedDataFrameSupportEnabled = context.session.sessionState.conf.getConf( + SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING) + // A list of physical optimizer rules to be applied right after a new stage is created. The input // plan to these rules has exchange as its root node. private def postStageCreationRules(outputsColumnar: Boolean) = Seq( @@ -345,6 +348,16 @@ case class AdaptiveSparkPlanExec( if (!isSubquery && currentPhysicalPlan.exists(_.subqueries.nonEmpty)) { getExecutionId.foreach(onUpdatePlan(_, Seq.empty)) } + + // Need to post final subPlan changes under InMemoryRelation(IMR) by supporting AQE under IMR + if (isAQECachedDataFrameSupportEnabled) { + val executionId = getExecutionId.orElse { + Option(context.session.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)) + .map(_.toLong) + } + executionId.foreach(onUpdatePlan(_, Seq.empty)) + } + logOnLevel(s"Final plan:\n$currentPhysicalPlan") }