From 07aa2e2a2442d2c5187838abde9c0fae4e88314a Mon Sep 17 00:00:00 2001 From: Linhong Liu Date: Wed, 28 Dec 2022 14:15:17 -0800 Subject: [PATCH 1/7] init --- .../org/apache/spark/ui/static/webui.css | 12 + .../org/apache/spark/internal/config/UI.scala | 7 + .../spark/sql/execution/SQLExecution.scala | 27 ++ .../sql/execution/ui/AllExecutionsPage.scala | 269 +++++++++++++----- .../execution/ui/SQLAppStatusListener.scala | 6 +- .../sql/execution/ui/SQLAppStatusStore.scala | 1 + .../spark/sql/execution/ui/SQLListener.scala | 2 + .../spark/sql/execution/ui/SQLTab.scala | 3 + .../execution/ui/AllExecutionsPageSuite.scala | 45 +++ 9 files changed, 301 insertions(+), 71 deletions(-) diff --git a/core/src/main/resources/org/apache/spark/ui/static/webui.css b/core/src/main/resources/org/apache/spark/ui/static/webui.css index 0252bc800471..630b8c2afa5e 100755 --- a/core/src/main/resources/org/apache/spark/ui/static/webui.css +++ b/core/src/main/resources/org/apache/spark/ui/static/webui.css @@ -187,6 +187,18 @@ pre { display: none; } +.sub-execution-list { + font-size:0.9rem; +} + +.sub-execution-list.collapsed { + display: none; +} + +.table-striped .sub-execution-list table tr { + background-color: inherit; +} + .description-input { overflow: hidden; text-overflow: ellipsis; diff --git a/core/src/main/scala/org/apache/spark/internal/config/UI.scala b/core/src/main/scala/org/apache/spark/internal/config/UI.scala index d09620b8e34a..245caa58db77 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/UI.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/UI.scala @@ -229,4 +229,11 @@ private[spark] object UI { .stringConf .transform(_.toUpperCase(Locale.ROOT)) .createWithDefault("LOCAL") + + val UI_SQL_GROUP_SUB_EXECUTION_ENABLED = ConfigBuilder("spark.ui.sql.group.sub.execution.enabled") + .doc("Whether to group sub executions together in SQL UI when they belong to the same " + + "root execution") + .version("3.4.0") + .booleanConf + .createWithDefault(true) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index bb4cea474fe0..3784539e9295 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -30,6 +30,7 @@ import org.apache.spark.util.Utils object SQLExecution { val EXECUTION_ID_KEY = "spark.sql.execution.id" + val EXECUTION_ROOT_ID_KEY = "spark.sql.execution.root.id" private val _nextExecutionId = new AtomicLong(0) @@ -55,6 +56,28 @@ object SQLExecution { } } + /** + * Track the "root" SQL Execution Id for nested/sub queries. + * For the root execution, rootExecutionId == executionId. + */ + private def setRootExecutionId(sc: SparkContext, executionId: String): Unit = { + // The current execution is the root execution if the root execution ID is null + if (sc.getLocalProperty(EXECUTION_ROOT_ID_KEY) == null) { + sc.setLocalProperty(EXECUTION_ROOT_ID_KEY, executionId) + } + } + + /** + * Unset the "root" SQL Execution Id once the "root" SQL execution completes. + */ + private def unsetRootExecutionId(sc: SparkContext, executionId: String): Unit = { + // The current execution is the root execution if rootExecutionId == executionId. + // Unset the property since the root sql execution is complete. + if (sc.getLocalProperty(EXECUTION_ROOT_ID_KEY) == executionId) { + sc.setLocalProperty(EXECUTION_ROOT_ID_KEY, null) + } + } + /** * Wrap an action that will execute "queryExecution" to track all Spark jobs in the body so that * we can connect them with an execution. @@ -67,6 +90,8 @@ object SQLExecution { val oldExecutionId = sc.getLocalProperty(EXECUTION_ID_KEY) val executionId = SQLExecution.nextExecutionId sc.setLocalProperty(EXECUTION_ID_KEY, executionId.toString) + setRootExecutionId(sc, executionId.toString) + val rootExecutionId = sc.getLocalProperty(EXECUTION_ROOT_ID_KEY).toLong executionIdToQueryExecution.put(executionId, queryExecution) try { // sparkContext.getCallSite() would first try to pick up any call site that was previously @@ -98,6 +123,7 @@ object SQLExecution { try { sc.listenerBus.post(SparkListenerSQLExecutionStart( executionId = executionId, + rootExecutionId = rootExecutionId, description = desc, details = callSite.longForm, physicalPlanDescription = queryExecution.explainString(planDescriptionMode), @@ -140,6 +166,7 @@ object SQLExecution { } finally { executionIdToQueryExecution.remove(executionId) sc.setLocalProperty(EXECUTION_ID_KEY, oldExecutionId) + unsetRootExecutionId(sc, oldExecutionId) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala index a7adc9431c35..4c625b4da06a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala @@ -26,6 +26,7 @@ import scala.xml.{Node, NodeSeq} import org.apache.spark.JobExecutionStatus import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.UI.UI_SQL_GROUP_SUB_EXECUTION_ENABLED import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.ui.{PagedDataSource, PagedTable, UIUtils, WebUIPage} import org.apache.spark.util.Utils @@ -33,32 +34,57 @@ import org.apache.spark.util.Utils private[ui] class AllExecutionsPage(parent: SQLTab) extends WebUIPage("") with Logging { private val sqlStore = parent.sqlStore + private val groupSubExecutionEnabled = parent.conf.get(UI_SQL_GROUP_SUB_EXECUTION_ENABLED) override def render(request: HttpServletRequest): Seq[Node] = { val currentTime = System.currentTimeMillis() val running = new mutable.ArrayBuffer[SQLExecutionUIData]() val completed = new mutable.ArrayBuffer[SQLExecutionUIData]() val failed = new mutable.ArrayBuffer[SQLExecutionUIData]() + val executionIdToSubExecutions = + new mutable.HashMap[Long, mutable.ArrayBuffer[SQLExecutionUIData]]() sqlStore.executionsList().foreach { e => - if (e.errorMessage.isDefined) { - if (e.errorMessage.get.isEmpty) { - completed += e + def processExecution(e: SQLExecutionUIData): Unit = { + if (e.errorMessage.isDefined) { + if (e.errorMessage.get.isEmpty) { + completed += e + } else { + failed += e + } + } else if (e.completionTime.isEmpty) { + running += e } else { - failed += e + // When `completionTime` is present, it means the query execution is completed and + // `errorMessage` should be present as well. However, events generated by old versions of + // Spark do not have the `errorMessage` field. We have to check the status of this query + // execution's jobs. + val isFailed = e.jobs.exists { case (_, status) => status == JobExecutionStatus.FAILED } + if (isFailed) { + failed += e + } else { + completed += e + } + } + } + // group the sub execution only if the root execution will be displayed (i.e. not missing) + if (groupSubExecutionEnabled && + e.executionId != e.rootExecutionId && + executionIdToSubExecutions.contains(e.rootExecutionId)) { + executionIdToSubExecutions.get(e.rootExecutionId).foreach { executions => + executions += e } - } else if (e.completionTime.isEmpty) { - running += e } else { - // When `completionTime` is present, it means the query execution is completed and - // `errorMessage` should be present as well. However, events generated by old versions of - // Spark do not have the `errorMessage` field. We have to check the status of this query - // execution's jobs. - val isFailed = e.jobs.exists { case (_, status) => status == JobExecutionStatus.FAILED } - if (isFailed) { - failed += e + if (groupSubExecutionEnabled) { + // add the execution id to indicate it'll be displayed as root, so the executions with + // the same root execution id will be added here and displayed as sub execution. + // If the root execution is not found (e.g. event loss), then the sub executions will + // be displayed in the root list instead. + // NOTE: this code assumes the root execution id always comes first. which is guaranteed + // by the `sqlStore.executionsList()` + executionIdToSubExecutions(e.executionId) = new mutable.ArrayBuffer[SQLExecutionUIData]() } else { - completed += e + processExecution(e) } } } @@ -68,7 +94,8 @@ private[ui] class AllExecutionsPage(parent: SQLTab) extends WebUIPage("") with L if (running.nonEmpty) { val runningPageTable = - executionsTable(request, "running", running.toSeq, currentTime, true, true, true) + executionsTable(request, "running", running.toSeq, executionIdToSubExecutions.toMap, + currentTime, true, true, true) _content ++=
@@ -210,7 +241,9 @@ private[ui] class ExecutionPagedTable( currentTime: Long, showRunningJobs: Boolean, showSucceededJobs: Boolean, - showFailedJobs: Boolean) extends PagedTable[ExecutionTableRowData] { + showFailedJobs: Boolean, + subExecutions: Map[Long, Seq[SQLExecutionUIData]] = Map.empty) + extends PagedTable[ExecutionTableRowData] { private val (sortColumn, desc, pageSize) = getTableParameters(request, executionTag, "ID") @@ -224,11 +257,14 @@ private[ui] class ExecutionPagedTable( desc, showRunningJobs, showSucceededJobs, - showFailedJobs) + showFailedJobs, + subExecutions) private val parameterPath = s"$basePath/$subPath/?${getParameterOtherTable(request, executionTag)}" + private val showSubExecutions = subExecutions.nonEmpty + override def tableId: String = s"$executionTag-table" override def tableCssClass: String = @@ -250,32 +286,39 @@ private[ui] class ExecutionPagedTable( override def goButtonFormPath: String = s"$parameterPath&$executionTag.sort=$encodedSortColumn&$executionTag.desc=$desc#$tableHeaderId" - override def headers: Seq[Node] = { - // Information for each header: title, sortable, tooltip - val executionHeadersAndCssClasses: Seq[(String, Boolean, Option[String])] = - Seq( - ("ID", true, None), - ("Description", true, None), - ("Submitted", true, None), - ("Duration", true, Some("Time from query submission to completion (or if still executing," + - "time since submission)"))) ++ { - if (showRunningJobs && showSucceededJobs && showFailedJobs) { - Seq( - ("Running Job IDs", true, None), - ("Succeeded Job IDs", true, None), - ("Failed Job IDs", true, None)) - } else if (showSucceededJobs && showFailedJobs) { - Seq( - ("Succeeded Job IDs", true, None), - ("Failed Job IDs", true, None)) - } else { - Seq(("Job IDs", true, None)) - } + // Information for each header: title, sortable, tooltip + private val headerInfo: Seq[(String, Boolean, Option[String])] = { + Seq( + ("ID", true, None), + ("Description", true, None), + ("Submitted", true, None), + ("Duration", true, Some("Time from query submission to completion (or if still executing," + + "time since submission)"))) ++ { + if (showRunningJobs && showSucceededJobs && showFailedJobs) { + Seq( + ("Running Job IDs", true, None), + ("Succeeded Job IDs", true, None), + ("Failed Job IDs", true, None)) + } else if (showSucceededJobs && showFailedJobs) { + Seq( + ("Succeeded Job IDs", true, None), + ("Failed Job IDs", true, None)) + } else { + Seq(("Job IDs", true, None)) + } + } ++ { + if (showSubExecutions) { + Seq(("Sub Execution IDs", true, None)) + } else { + Nil } + } + } - isSortColumnValid(executionHeadersAndCssClasses, sortColumn) + override def headers: Seq[Node] = { + isSortColumnValid(headerInfo, sortColumn) - headerRow(executionHeadersAndCssClasses, desc, pageSize, sortColumn, parameterPath, + headerRow(headerInfo, desc, pageSize, sortColumn, parameterPath, executionTag, tableHeaderId) } @@ -290,35 +333,112 @@ private[ui] class ExecutionPagedTable( } } - - - {executionUIData.executionId.toString} - - - {descriptionCell(executionUIData)} - - - {UIUtils.formatDate(submissionTime)} - - - {UIUtils.formatDuration(duration)} - - {if (showRunningJobs) { + def executionLinks(executionData: Seq[Long]): Seq[Node] = { + val details = if (executionData.nonEmpty) { + val onClickScript = "this.parentNode.parentNode.nextElementSibling.nextElementSibling" + + ".classList.toggle('collapsed')" + + +details + + } else { + Nil + } + +
{ + executionData.map { executionId => + [{executionId.toString}] + } + }
++ details + } + + val baseRow: Seq[Node] = { + - {jobLinks(executionTableRow.runningJobData)} + {executionUIData.executionId.toString} - }} - {if (showSucceededJobs) { - {jobLinks(executionTableRow.completedJobData)} + {descriptionCell(executionUIData)} - }} - {if (showFailedJobs) { - - {jobLinks(executionTableRow.failedJobData)} + + {UIUtils.formatDate(submissionTime)} - }} - + + {UIUtils.formatDuration(duration)} + + {if (showRunningJobs) { + + {jobLinks(executionTableRow.runningJobData)} + + }} + {if (showSucceededJobs) { + + {jobLinks(executionTableRow.completedJobData)} + + }} + {if (showFailedJobs) { + + {jobLinks(executionTableRow.failedJobData)} + + }} + {if (showSubExecutions) { + + {executionLinks(executionTableRow.subExecutionData.map(_.executionUIData.executionId))} + + }} + + } + + val subRow: Seq[Node] = {if (executionTableRow.subExecutionData.nonEmpty) { + + + + + + + + {headerInfo.dropRight(1).map(info => )} + + + + { + executionTableRow.subExecutionData.map { rowData => + val executionUIData = rowData.executionUIData + val submissionTime = executionUIData.submissionTime + val duration = rowData.duration + + + + + {if (showRunningJobs) { + + }}{if (showSucceededJobs) { + + }}{if (showFailedJobs) { + + }} + + } + } + +
{info._1}
+ {executionUIData.executionId.toString} + + {descriptionCell(executionUIData)} + + {UIUtils.formatDate(submissionTime)} + + {UIUtils.formatDuration(duration)} + + {jobLinks(rowData.runningJobData)} + + {jobLinks(rowData.completedJobData)} + + {jobLinks(rowData.failedJobData)} +
+ + + } else { Nil }} + baseRow ++ subRow } private def descriptionCell(execution: SQLExecutionUIData): Seq[Node] = { @@ -358,7 +478,8 @@ private[ui] class ExecutionTableRowData( val executionUIData: SQLExecutionUIData, val runningJobData: Seq[Int], val completedJobData: Seq[Int], - val failedJobData: Seq[Int]) + val failedJobData: Seq[Int], + val subExecutionData: Seq[ExecutionTableRowData]) private[ui] class ExecutionDataSource( @@ -369,7 +490,9 @@ private[ui] class ExecutionDataSource( desc: Boolean, showRunningJobs: Boolean, showSucceededJobs: Boolean, - showFailedJobs: Boolean) extends PagedDataSource[ExecutionTableRowData](pageSize) { + showFailedJobs: Boolean, + subExecutions: Map[Long, Seq[SQLExecutionUIData]]) + extends PagedDataSource[ExecutionTableRowData](pageSize) { // Convert ExecutionData to ExecutionTableRowData which contains the final contents to show // in the table so that we can avoid creating duplicate contents during sorting the data @@ -401,12 +524,18 @@ private[ui] class ExecutionDataSource( }.map { case (jobId, _) => jobId }.toSeq.sorted } else Seq.empty + val executions = subExecutions.get(executionUIData.executionId) match { + case Some(executions) => executions.map(executionRow) + case _ => Seq.empty + } + new ExecutionTableRowData( duration, executionUIData, runningJobData, completedJobData, - failedJobData) + failedJobData, + executions) } /** Return Ordering according to sortColumn and desc. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index 99718b63353a..5d07f3b81d9e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -90,6 +90,7 @@ class SQLAppStatusListener( // data corresponding to the execId. val sqlStoreData = kvstore.read(classOf[SQLExecutionUIData], executionId) val executionData = new LiveExecutionData(executionId) + executionData.rootExecutionId = sqlStoreData.rootExecutionId executionData.description = sqlStoreData.description executionData.details = sqlStoreData.details executionData.physicalPlanDescription = sqlStoreData.physicalPlanDescription @@ -339,7 +340,7 @@ class SQLAppStatusListener( } private def onExecutionStart(event: SparkListenerSQLExecutionStart): Unit = { - val SparkListenerSQLExecutionStart(executionId, description, details, + val SparkListenerSQLExecutionStart(executionId, rootExecutionId, description, details, physicalPlanDescription, sparkPlanInfo, time, modifiedConfigs) = event val planGraph = SparkPlanGraph(sparkPlanInfo) @@ -354,6 +355,7 @@ class SQLAppStatusListener( kvstore.write(graphToStore) val exec = getOrCreateExecution(executionId) + exec.rootExecutionId = rootExecutionId exec.description = description exec.details = details exec.physicalPlanDescription = physicalPlanDescription @@ -482,6 +484,7 @@ class SQLAppStatusListener( private class LiveExecutionData(val executionId: Long) extends LiveEntity { + var rootExecutionId: Long = _ var description: String = null var details: String = null var physicalPlanDescription: String = null @@ -504,6 +507,7 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity { override protected def doUpdate(): Any = { new SQLExecutionUIData( executionId, + rootExecutionId, description, details, physicalPlanDescription, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala index 25cb8c2f0434..ffd8690bcccb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala @@ -83,6 +83,7 @@ class SQLAppStatusStore( class SQLExecutionUIData( @KVIndexParam val executionId: Long, + val rootExecutionId: Long, val description: String, val details: String, val physicalPlanDescription: String, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala index ce665e118938..b931b4fcde1b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala @@ -43,6 +43,8 @@ case class SparkListenerSQLAdaptiveSQLMetricUpdates( @DeveloperApi case class SparkListenerSQLExecutionStart( executionId: Long, + // if the execution is a root, then rootExecutionId == executionId + rootExecutionId: Long, description: String, details: String, physicalPlanDescription: String, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala index 8a258fb12197..5aa6ddbdb7af 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala @@ -17,12 +17,15 @@ package org.apache.spark.sql.execution.ui +import org.apache.spark.SparkConf import org.apache.spark.internal.Logging import org.apache.spark.ui.{SparkUI, SparkUITab} class SQLTab(val sqlStore: SQLAppStatusStore, sparkUI: SparkUI) extends SparkUITab(sparkUI, "SQL") with Logging { + def conf: SparkConf = sparkUI.conf + override val name = "SQL / DataFrame" val parent = sparkUI diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/AllExecutionsPageSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/AllExecutionsPageSuite.scala index 495a473a0141..58e704ecd877 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/AllExecutionsPageSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/AllExecutionsPageSuite.scala @@ -27,6 +27,7 @@ import org.mockito.Mockito.{mock, when, RETURNS_SMART_NULLS} import org.scalatest.BeforeAndAfter import org.apache.spark.SparkConf +import org.apache.spark.internal.config.UI.UI_SQL_GROUP_SUB_EXECUTION_ENABLED import org.apache.spark.scheduler.{JobFailed, SparkListenerJobEnd, SparkListenerJobStart} import org.apache.spark.sql.DataFrame import org.apache.spark.sql.execution.{SparkPlanInfo, SQLExecution} @@ -119,6 +120,50 @@ class AllExecutionsPageSuite extends SharedSparkSession with BeforeAndAfter { assert(html.contains("duration")) } + test("group sub executions") { + val statusStore = createStatusStore + val tab = mock(classOf[SQLTab], RETURNS_SMART_NULLS) + val request = mock(classOf[HttpServletRequest]) + + val sparkConf = new SparkConf(false).set(UI_SQL_GROUP_SUB_EXECUTION_ENABLED, true) + when(tab.conf).thenReturn(sparkConf) + when(tab.sqlStore).thenReturn(statusStore) + when(tab.appName).thenReturn("testing") + when(tab.headerTabs).thenReturn(Seq.empty) + + val listener = statusStore.listener.get + val page = new AllExecutionsPage(tab) + val df = createTestDataFrame + listener.onOtherEvent(SparkListenerSQLExecutionStart( + 0, + 0, + "test", + "test", + df.queryExecution.toString, + SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), + System.currentTimeMillis())) + listener.onOtherEvent(SparkListenerSQLExecutionStart( + 1, + 0, + "test", + "test", + df.queryExecution.toString, + SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), + System.currentTimeMillis())) + // sub execution has a missing root execution + listener.onOtherEvent(SparkListenerSQLExecutionStart( + 2, + 100, + "test", + "test", + df.queryExecution.toString, + SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), + System.currentTimeMillis())) + val html = page.render(request).toString().toLowerCase(Locale.ROOT) + assert(html.contains("sub execution ids") && html.contains("sub-execution-list")) + // sub execution should still be displayed if the root execution is missing + assert(html.contains("id=2")) + } private def createStatusStore: SQLAppStatusStore = { val conf = sparkContext.conf From bc39922d938b1bc39117cd1a08d71b67af700f6f Mon Sep 17 00:00:00 2001 From: Linhong Liu Date: Wed, 28 Dec 2022 16:02:31 -0800 Subject: [PATCH 2/7] fix test --- .../scala/org/apache/spark/internal/config/UI.scala | 2 +- .../spark/sql/execution/ui/AllExecutionsPage.scala | 3 +-- .../protobuf/sql/SQLExecutionUIDataSerializer.scala | 1 + .../spark/sql/execution/SQLJsonProtocolSuite.scala | 2 +- .../history/SQLEventFilterBuilderSuite.scala | 2 +- .../history/SQLLiveEntitiesEventFilterSuite.scala | 4 ++-- .../sql/execution/ui/AllExecutionsPageSuite.scala | 5 +++++ .../execution/ui/MetricsAggregationBenchmark.scala | 1 + .../sql/execution/ui/SQLAppStatusListenerSuite.scala | 12 +++++++++++- .../spark/status/api/v1/sql/SqlResourceSuite.scala | 1 + .../sql/KVStoreProtobufSerializerSuite.scala | 2 ++ 11 files changed, 27 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/UI.scala b/core/src/main/scala/org/apache/spark/internal/config/UI.scala index 245caa58db77..76e7822a1213 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/UI.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/UI.scala @@ -235,5 +235,5 @@ private[spark] object UI { "root execution") .version("3.4.0") .booleanConf - .createWithDefault(true) + .createWithDefault(false) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala index 4c625b4da06a..4ee8aabf8bac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala @@ -83,9 +83,8 @@ private[ui] class AllExecutionsPage(parent: SQLTab) extends WebUIPage("") with L // NOTE: this code assumes the root execution id always comes first. which is guaranteed // by the `sqlStore.executionsList()` executionIdToSubExecutions(e.executionId) = new mutable.ArrayBuffer[SQLExecutionUIData]() - } else { - processExecution(e) } + processExecution(e) } } diff --git a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala index 77b6f8925cb6..12fb2480de14 100644 --- a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala @@ -74,6 +74,7 @@ class SQLExecutionUIDataSerializer extends ProtobufSerDe { new SQLExecutionUIData( executionId = ui.getExecutionId, + rootExecutionId = ui.getExecutionId, description = ui.getDescription, details = ui.getDetails, physicalPlanDescription = ui.getPhysicalPlanDescription, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala index 9f2b08a65eaf..e9d98ee97157 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala @@ -59,7 +59,7 @@ class SQLJsonProtocolSuite extends SparkFunSuite with LocalSparkSession { val reconstructedEvent = JsonProtocol.sparkEventFromJson(SQLExecutionStartJsonString) if (newExecutionStartEvent) { - val expectedEvent = SparkListenerSQLExecutionStart(0, "test desc", "test detail", + val expectedEvent = SparkListenerSQLExecutionStart(0, 0, "test desc", "test detail", "test plan", new SparkPlanInfo("TestNode", "test string", Nil, Map(), Nil), 0, Map("k1" -> "v1")) assert(reconstructedEvent == expectedEvent) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLEventFilterBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLEventFilterBuilderSuite.scala index 090c149886a8..42b27bd9f28e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLEventFilterBuilderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLEventFilterBuilderSuite.scala @@ -57,7 +57,7 @@ class SQLEventFilterBuilderSuite extends SparkFunSuite { } // Start SQL Execution - listener.onOtherEvent(SparkListenerSQLExecutionStart(1, "desc1", "details1", "plan", + listener.onOtherEvent(SparkListenerSQLExecutionStart(1, 1, "desc1", "details1", "plan", new SparkPlanInfo("node", "str", Seq.empty, Map.empty, Seq.empty), time, Map.empty)) time += 1 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLLiveEntitiesEventFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLLiveEntitiesEventFilterSuite.scala index 724df8ebe8bf..f1b77e502dfe 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLLiveEntitiesEventFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLLiveEntitiesEventFilterSuite.scala @@ -41,7 +41,7 @@ class SQLLiveEntitiesEventFilterSuite extends SparkFunSuite { val acceptFn = filter.acceptFn().lift // Verifying with finished SQL execution 1 - assert(Some(false) === acceptFn(SparkListenerSQLExecutionStart(1, "description1", "details1", + assert(Some(false) === acceptFn(SparkListenerSQLExecutionStart(1, 1, "description1", "details1", "plan", null, 0, Map.empty))) assert(Some(false) === acceptFn(SparkListenerSQLExecutionEnd(1, 0))) assert(Some(false) === acceptFn(SparkListenerSQLAdaptiveExecutionUpdate(1, "plan", null))) @@ -88,7 +88,7 @@ class SQLLiveEntitiesEventFilterSuite extends SparkFunSuite { } // Verifying with live SQL execution 2 - assert(Some(true) === acceptFn(SparkListenerSQLExecutionStart(2, "description2", "details2", + assert(Some(true) === acceptFn(SparkListenerSQLExecutionStart(2, 2, "description2", "details2", "plan", null, 0, Map.empty))) assert(Some(true) === acceptFn(SparkListenerSQLExecutionEnd(2, 0))) assert(Some(true) === acceptFn(SparkListenerSQLAdaptiveExecutionUpdate(2, "plan", null))) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/AllExecutionsPageSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/AllExecutionsPageSuite.scala index 58e704ecd877..6f8a347d0b3b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/AllExecutionsPageSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/AllExecutionsPageSuite.scala @@ -59,6 +59,7 @@ class AllExecutionsPageSuite extends SharedSparkSession with BeforeAndAfter { when(tab.sqlStore).thenReturn(statusStore) val request = mock(classOf[HttpServletRequest]) + when(tab.conf).thenReturn(new SparkConf(false)) when(tab.appName).thenReturn("testing") when(tab.headerTabs).thenReturn(Seq.empty) @@ -73,6 +74,7 @@ class AllExecutionsPageSuite extends SharedSparkSession with BeforeAndAfter { when(tab.sqlStore).thenReturn(statusStore) val request = mock(classOf[HttpServletRequest]) + when(tab.conf).thenReturn(new SparkConf(false)) when(tab.appName).thenReturn("testing") when(tab.headerTabs).thenReturn(Seq.empty) @@ -81,6 +83,7 @@ class AllExecutionsPageSuite extends SharedSparkSession with BeforeAndAfter { val page = new AllExecutionsPage(tab) val df = createTestDataFrame listener.onOtherEvent(SparkListenerSQLExecutionStart( + 0, 0, "test", "test", @@ -108,6 +111,7 @@ class AllExecutionsPageSuite extends SharedSparkSession with BeforeAndAfter { val tab = mock(classOf[SQLTab], RETURNS_SMART_NULLS) val request = mock(classOf[HttpServletRequest]) + when(tab.conf).thenReturn(new SparkConf(false)) when(tab.sqlStore).thenReturn(statusStore) when(tab.appName).thenReturn("testing") when(tab.headerTabs).thenReturn(Seq.empty) @@ -194,6 +198,7 @@ class AllExecutionsPageSuite extends SharedSparkSession with BeforeAndAfter { Seq(0, 1).foreach { executionId => val df = createTestDataFrame listener.onOtherEvent(SparkListenerSQLExecutionStart( + executionId, executionId, "test", "test", diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/MetricsAggregationBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/MetricsAggregationBenchmark.scala index d994126fe636..3b9efb180578 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/MetricsAggregationBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/MetricsAggregationBenchmark.scala @@ -74,6 +74,7 @@ object MetricsAggregationBenchmark extends BenchmarkBase { val idgen = new AtomicInteger() val executionId = idgen.incrementAndGet() val executionStart = SparkListenerSQLExecutionStart( + executionId, executionId, getClass().getName(), getClass().getName(), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala index 1cecb2a6ba94..ed282cb13d15 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala @@ -195,6 +195,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils }.toMap listener.onOtherEvent(SparkListenerSQLExecutionStart( + executionId, executionId, "test", "test", @@ -348,7 +349,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils val listener = new SparkListener { override def onOtherEvent(event: SparkListenerEvent): Unit = { event match { - case SparkListenerSQLExecutionStart(_, _, _, planDescription, _, _, _) => + case SparkListenerSQLExecutionStart(_, _, _, _, planDescription, _, _, _) => assert(expected.forall(planDescription.contains)) checkDone = true case _ => // ignore other events @@ -385,6 +386,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils val executionId = 0 val df = createTestDataFrame listener.onOtherEvent(SparkListenerSQLExecutionStart( + executionId, executionId, "test", "test", @@ -415,6 +417,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils val executionId = 0 val df = createTestDataFrame listener.onOtherEvent(SparkListenerSQLExecutionStart( + executionId, executionId, "test", "test", @@ -456,6 +459,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils val executionId = 0 val df = createTestDataFrame listener.onOtherEvent(SparkListenerSQLExecutionStart( + executionId, executionId, "test", "test", @@ -486,6 +490,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils val executionId = 0 val df = createTestDataFrame listener.onOtherEvent(SparkListenerSQLExecutionStart( + executionId, executionId, "test", "test", @@ -517,6 +522,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils val executionId = 0 val df = createTestDataFrame listener.onOtherEvent(SparkListenerSQLExecutionStart( + executionId, executionId, "test", "test", @@ -657,6 +663,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils // Start execution 1 and execution 2 time += 1 listener.onOtherEvent(SparkListenerSQLExecutionStart( + 1, 1, "test", "test", @@ -666,6 +673,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils Map.empty)) time += 1 listener.onOtherEvent(SparkListenerSQLExecutionStart( + 2, 2, "test", "test", @@ -683,6 +691,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils // Start execution 3 and execution 2 should be evicted. time += 1 listener.onOtherEvent(SparkListenerSQLExecutionStart( + 3, 3, "test", "test", @@ -719,6 +728,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils .allNodes.flatMap(_.metrics.map(_.accumulatorId)) listener.onOtherEvent(SparkListenerSQLExecutionStart( + executionId, executionId, "test", "test", diff --git a/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala index 336779988fc8..bd2b67364db0 100644 --- a/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala @@ -82,6 +82,7 @@ object SqlResourceSuite { new SQLExecutionUIData( executionId = 0, + rootExecutionId = 0, description = DESCRIPTION, details = "", physicalPlanDescription = PLAN_DESCRIPTION, diff --git a/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala b/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala index 5f1cd812d974..f096201b43f1 100644 --- a/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala @@ -52,6 +52,7 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite { val input1 = new SQLExecutionUIData( executionId = templateData.executionId, + rootExecutionId = templateData.rootExecutionId, description = templateData.description, details = templateData.details, physicalPlanDescription = templateData.physicalPlanDescription, @@ -71,6 +72,7 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite { val input2 = new SQLExecutionUIData( executionId = templateData.executionId, + rootExecutionId = templateData.rootExecutionId, description = templateData.description, details = templateData.details, physicalPlanDescription = templateData.physicalPlanDescription, From 79ec0510f7d1abed09e560b1211d392ca03a78df Mon Sep 17 00:00:00 2001 From: Linhong Liu Date: Thu, 29 Dec 2022 12:07:53 -0800 Subject: [PATCH 3/7] address comments --- .../spark/sql/execution/SQLExecution.scala | 2 +- .../sql/execution/ui/AllExecutionsPage.scala | 20 +++++++++---------- 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index 3784539e9295..6883aefec1bc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -166,7 +166,7 @@ object SQLExecution { } finally { executionIdToQueryExecution.remove(executionId) sc.setLocalProperty(EXECUTION_ID_KEY, oldExecutionId) - unsetRootExecutionId(sc, oldExecutionId) + unsetRootExecutionId(sc, executionId.toString) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala index 4ee8aabf8bac..690350962343 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala @@ -69,11 +69,9 @@ private[ui] class AllExecutionsPage(parent: SQLTab) extends WebUIPage("") with L } // group the sub execution only if the root execution will be displayed (i.e. not missing) if (groupSubExecutionEnabled && - e.executionId != e.rootExecutionId && - executionIdToSubExecutions.contains(e.rootExecutionId)) { - executionIdToSubExecutions.get(e.rootExecutionId).foreach { executions => - executions += e - } + e.executionId != e.rootExecutionId && + executionIdToSubExecutions.contains(e.rootExecutionId)) { + executionIdToSubExecutions(e.rootExecutionId) += e } else { if (groupSubExecutionEnabled) { // add the execution id to indicate it'll be displayed as root, so the executions with @@ -93,8 +91,8 @@ private[ui] class AllExecutionsPage(parent: SQLTab) extends WebUIPage("") with L if (running.nonEmpty) { val runningPageTable = - executionsTable(request, "running", running.toSeq, executionIdToSubExecutions.toMap, - currentTime, true, true, true) + executionsTable(request, "running", running.toSeq, + executionIdToSubExecutions.mapValues(_.toSeq).toMap, currentTime, true, true, true) _content ++= Date: Tue, 3 Jan 2023 19:11:17 -0800 Subject: [PATCH 4/7] switch flag --- core/src/main/scala/org/apache/spark/internal/config/UI.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/UI.scala b/core/src/main/scala/org/apache/spark/internal/config/UI.scala index 76e7822a1213..245caa58db77 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/UI.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/UI.scala @@ -235,5 +235,5 @@ private[spark] object UI { "root execution") .version("3.4.0") .booleanConf - .createWithDefault(false) + .createWithDefault(true) } From 4d3c15a6d4e1360f4c0f7fbce74d96fda5d2aed8 Mon Sep 17 00:00:00 2001 From: Linhong Liu Date: Tue, 3 Jan 2023 21:13:18 -0800 Subject: [PATCH 5/7] code style and address comments --- .../org/apache/spark/internal/config/UI.scala | 2 +- .../sql/execution/ui/AllExecutionsPage.scala | 77 ++++++++++--------- 2 files changed, 43 insertions(+), 36 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/UI.scala b/core/src/main/scala/org/apache/spark/internal/config/UI.scala index 245caa58db77..97d6a47729c9 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/UI.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/UI.scala @@ -230,7 +230,7 @@ private[spark] object UI { .transform(_.toUpperCase(Locale.ROOT)) .createWithDefault("LOCAL") - val UI_SQL_GROUP_SUB_EXECUTION_ENABLED = ConfigBuilder("spark.ui.sql.group.sub.execution.enabled") + val UI_SQL_GROUP_SUB_EXECUTION_ENABLED = ConfigBuilder("spark.ui.sql.groupSubExecutionEnabled") .doc("Whether to group sub executions together in SQL UI when they belong to the same " + "root execution") .version("3.4.0") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala index 690350962343..cd8f31b3c21d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala @@ -290,7 +290,7 @@ private[ui] class ExecutionPagedTable( ("Description", true, None), ("Submitted", true, None), ("Duration", true, Some("Time from query submission to completion (or if still executing," + - "time since submission)"))) ++ { + " time since submission)"))) ++ { if (showRunningJobs && showSucceededJobs && showFailedJobs) { Seq( ("Running Job IDs", true, None), @@ -341,11 +341,13 @@ private[ui] class ExecutionPagedTable( Nil } -
{ - executionData.map { executionId => - [{executionId.toString}] +
+ { + executionData.map { executionId => + [{executionId.toString}] + } } - }
++ details +
++ details } val baseRow: Seq[Node] = { @@ -385,19 +387,19 @@ private[ui] class ExecutionPagedTable( } - val subRow: Seq[Node] = {if (executionTableRow.subExecutionData.nonEmpty) { + val subRow: Seq[Node] = if (executionTableRow.subExecutionData.nonEmpty) { - - - - - - - {headerInfo.dropRight(1).map(info => )} - - - - { + + + - - } else { Nil }} + } + +
{info._1}
+ + + } else { + Nil + } baseRow ++ subRow } From 2fc7eba60dc9484685e91a468cae7c188d91f003 Mon Sep 17 00:00:00 2001 From: Linhong Liu Date: Mon, 9 Jan 2023 21:59:36 -0800 Subject: [PATCH 6/7] address comments --- .../protobuf/org/apache/spark/status/protobuf/store_types.proto | 1 + core/src/main/resources/org/apache/spark/ui/static/webui.css | 2 +- core/src/main/scala/org/apache/spark/internal/config/UI.scala | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto index 1c3e5bfc49ae..e17ca36674c2 100644 --- a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto +++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto @@ -397,6 +397,7 @@ message SQLExecutionUIData { repeated int64 stages = 11; bool metric_values_is_null = 12; map metric_values = 13; + optional int64 root_execution_id = 14; } message SparkPlanGraphNode { diff --git a/core/src/main/resources/org/apache/spark/ui/static/webui.css b/core/src/main/resources/org/apache/spark/ui/static/webui.css index 630b8c2afa5e..f952f86503e3 100755 --- a/core/src/main/resources/org/apache/spark/ui/static/webui.css +++ b/core/src/main/resources/org/apache/spark/ui/static/webui.css @@ -188,7 +188,7 @@ pre { } .sub-execution-list { - font-size:0.9rem; + font-size: 0.9rem; } .sub-execution-list.collapsed { diff --git a/core/src/main/scala/org/apache/spark/internal/config/UI.scala b/core/src/main/scala/org/apache/spark/internal/config/UI.scala index 97d6a47729c9..a32e60de2a45 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/UI.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/UI.scala @@ -230,7 +230,7 @@ private[spark] object UI { .transform(_.toUpperCase(Locale.ROOT)) .createWithDefault("LOCAL") - val UI_SQL_GROUP_SUB_EXECUTION_ENABLED = ConfigBuilder("spark.ui.sql.groupSubExecutionEnabled") + val UI_SQL_GROUP_SUB_EXECUTION_ENABLED = ConfigBuilder("spark.ui.groupSQLSubExecutionEnabled") .doc("Whether to group sub executions together in SQL UI when they belong to the same " + "root execution") .version("3.4.0") From ee2c4f5317d5cfd8574f02c06398de9f1d0e893c Mon Sep 17 00:00:00 2001 From: Linhong Liu Date: Mon, 9 Jan 2023 22:16:06 -0800 Subject: [PATCH 7/7] address comments --- .../spark/sql/execution/SQLExecution.scala | 35 ++++++------------- .../sql/SQLExecutionUIDataSerializer.scala | 2 +- .../status/api/v1/sql/SqlResourceSuite.scala | 2 +- 3 files changed, 13 insertions(+), 26 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index 6883aefec1bc..90468b18a991 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -56,28 +56,6 @@ object SQLExecution { } } - /** - * Track the "root" SQL Execution Id for nested/sub queries. - * For the root execution, rootExecutionId == executionId. - */ - private def setRootExecutionId(sc: SparkContext, executionId: String): Unit = { - // The current execution is the root execution if the root execution ID is null - if (sc.getLocalProperty(EXECUTION_ROOT_ID_KEY) == null) { - sc.setLocalProperty(EXECUTION_ROOT_ID_KEY, executionId) - } - } - - /** - * Unset the "root" SQL Execution Id once the "root" SQL execution completes. - */ - private def unsetRootExecutionId(sc: SparkContext, executionId: String): Unit = { - // The current execution is the root execution if rootExecutionId == executionId. - // Unset the property since the root sql execution is complete. - if (sc.getLocalProperty(EXECUTION_ROOT_ID_KEY) == executionId) { - sc.setLocalProperty(EXECUTION_ROOT_ID_KEY, null) - } - } - /** * Wrap an action that will execute "queryExecution" to track all Spark jobs in the body so that * we can connect them with an execution. @@ -90,7 +68,12 @@ object SQLExecution { val oldExecutionId = sc.getLocalProperty(EXECUTION_ID_KEY) val executionId = SQLExecution.nextExecutionId sc.setLocalProperty(EXECUTION_ID_KEY, executionId.toString) - setRootExecutionId(sc, executionId.toString) + // Track the "root" SQL Execution Id for nested/sub queries. The current execution is the + // root execution if the root execution ID is null. + // And for the root execution, rootExecutionId == executionId. + if (sc.getLocalProperty(EXECUTION_ROOT_ID_KEY) == null) { + sc.setLocalProperty(EXECUTION_ROOT_ID_KEY, executionId.toString) + } val rootExecutionId = sc.getLocalProperty(EXECUTION_ROOT_ID_KEY).toLong executionIdToQueryExecution.put(executionId, queryExecution) try { @@ -166,7 +149,11 @@ object SQLExecution { } finally { executionIdToQueryExecution.remove(executionId) sc.setLocalProperty(EXECUTION_ID_KEY, oldExecutionId) - unsetRootExecutionId(sc, executionId.toString) + // Unset the "root" SQL Execution Id once the "root" SQL execution completes. + // The current execution is the root execution if rootExecutionId == executionId. + if (sc.getLocalProperty(EXECUTION_ROOT_ID_KEY) == executionId.toString) { + sc.setLocalProperty(EXECUTION_ROOT_ID_KEY, null) + } } } diff --git a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala index ce22a61cd7cd..1ccaf5c68c92 100644 --- a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala @@ -80,7 +80,7 @@ class SQLExecutionUIDataSerializer extends ProtobufSerDe { new SQLExecutionUIData( executionId = ui.getExecutionId, - rootExecutionId = ui.getExecutionId, + rootExecutionId = ui.getRootExecutionId, description = ui.getDescription, details = ui.getDetails, physicalPlanDescription = ui.getPhysicalPlanDescription, diff --git a/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala index 2a3a56cab19f..a0154d724dad 100644 --- a/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala @@ -82,7 +82,7 @@ object SqlResourceSuite { new SQLExecutionUIData( executionId = 0, - rootExecutionId = 0, + rootExecutionId = 1, description = DESCRIPTION, details = "", physicalPlanDescription = PLAN_DESCRIPTION,