diff --git a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto index 04c857f7c3c9..e9aaad261f9e 100644 --- a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto +++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto @@ -417,6 +417,7 @@ message SQLExecutionUIData { repeated int64 stages = 11; bool metric_values_is_null = 12; map metric_values = 13; + optional int64 root_execution_id = 14; } message SparkPlanGraphNode { diff --git a/core/src/main/resources/org/apache/spark/ui/static/webui.css b/core/src/main/resources/org/apache/spark/ui/static/webui.css index 0252bc800471..f952f86503e3 100755 --- a/core/src/main/resources/org/apache/spark/ui/static/webui.css +++ b/core/src/main/resources/org/apache/spark/ui/static/webui.css @@ -187,6 +187,18 @@ pre { display: none; } +.sub-execution-list { + font-size: 0.9rem; +} + +.sub-execution-list.collapsed { + display: none; +} + +.table-striped .sub-execution-list table tr { + background-color: inherit; +} + .description-input { overflow: hidden; text-overflow: ellipsis; diff --git a/core/src/main/scala/org/apache/spark/internal/config/UI.scala b/core/src/main/scala/org/apache/spark/internal/config/UI.scala index d09620b8e34a..a32e60de2a45 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/UI.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/UI.scala @@ -229,4 +229,11 @@ private[spark] object UI { .stringConf .transform(_.toUpperCase(Locale.ROOT)) .createWithDefault("LOCAL") + + val UI_SQL_GROUP_SUB_EXECUTION_ENABLED = ConfigBuilder("spark.ui.groupSQLSubExecutionEnabled") + .doc("Whether to group sub executions together in SQL UI when they belong to the same " + + "root execution") + .version("3.4.0") + .booleanConf + .createWithDefault(true) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index bb4cea474fe0..90468b18a991 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -30,6 +30,7 @@ import org.apache.spark.util.Utils object SQLExecution { val EXECUTION_ID_KEY = "spark.sql.execution.id" + val EXECUTION_ROOT_ID_KEY = "spark.sql.execution.root.id" private val _nextExecutionId = new AtomicLong(0) @@ -67,6 +68,13 @@ object SQLExecution { val oldExecutionId = sc.getLocalProperty(EXECUTION_ID_KEY) val executionId = SQLExecution.nextExecutionId sc.setLocalProperty(EXECUTION_ID_KEY, executionId.toString) + // Track the "root" SQL Execution Id for nested/sub queries. The current execution is the + // root execution if the root execution ID is null. + // And for the root execution, rootExecutionId == executionId. + if (sc.getLocalProperty(EXECUTION_ROOT_ID_KEY) == null) { + sc.setLocalProperty(EXECUTION_ROOT_ID_KEY, executionId.toString) + } + val rootExecutionId = sc.getLocalProperty(EXECUTION_ROOT_ID_KEY).toLong executionIdToQueryExecution.put(executionId, queryExecution) try { // sparkContext.getCallSite() would first try to pick up any call site that was previously @@ -98,6 +106,7 @@ object SQLExecution { try { sc.listenerBus.post(SparkListenerSQLExecutionStart( executionId = executionId, + rootExecutionId = rootExecutionId, description = desc, details = callSite.longForm, physicalPlanDescription = queryExecution.explainString(planDescriptionMode), @@ -140,6 +149,11 @@ object SQLExecution { } finally { executionIdToQueryExecution.remove(executionId) sc.setLocalProperty(EXECUTION_ID_KEY, oldExecutionId) + // Unset the "root" SQL Execution Id once the "root" SQL execution completes. + // The current execution is the root execution if rootExecutionId == executionId. + if (sc.getLocalProperty(EXECUTION_ROOT_ID_KEY) == executionId.toString) { + sc.setLocalProperty(EXECUTION_ROOT_ID_KEY, null) + } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala index a7adc9431c35..cd8f31b3c21d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala @@ -26,6 +26,7 @@ import scala.xml.{Node, NodeSeq} import org.apache.spark.JobExecutionStatus import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.UI.UI_SQL_GROUP_SUB_EXECUTION_ENABLED import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.ui.{PagedDataSource, PagedTable, UIUtils, WebUIPage} import org.apache.spark.util.Utils @@ -33,33 +34,55 @@ import org.apache.spark.util.Utils private[ui] class AllExecutionsPage(parent: SQLTab) extends WebUIPage("") with Logging { private val sqlStore = parent.sqlStore + private val groupSubExecutionEnabled = parent.conf.get(UI_SQL_GROUP_SUB_EXECUTION_ENABLED) override def render(request: HttpServletRequest): Seq[Node] = { val currentTime = System.currentTimeMillis() val running = new mutable.ArrayBuffer[SQLExecutionUIData]() val completed = new mutable.ArrayBuffer[SQLExecutionUIData]() val failed = new mutable.ArrayBuffer[SQLExecutionUIData]() + val executionIdToSubExecutions = + new mutable.HashMap[Long, mutable.ArrayBuffer[SQLExecutionUIData]]() sqlStore.executionsList().foreach { e => - if (e.errorMessage.isDefined) { - if (e.errorMessage.get.isEmpty) { - completed += e + def processExecution(e: SQLExecutionUIData): Unit = { + if (e.errorMessage.isDefined) { + if (e.errorMessage.get.isEmpty) { + completed += e + } else { + failed += e + } + } else if (e.completionTime.isEmpty) { + running += e } else { - failed += e + // When `completionTime` is present, it means the query execution is completed and + // `errorMessage` should be present as well. However, events generated by old versions of + // Spark do not have the `errorMessage` field. We have to check the status of this query + // execution's jobs. + val isFailed = e.jobs.exists { case (_, status) => status == JobExecutionStatus.FAILED } + if (isFailed) { + failed += e + } else { + completed += e + } } - } else if (e.completionTime.isEmpty) { - running += e + } + // group the sub execution only if the root execution will be displayed (i.e. not missing) + if (groupSubExecutionEnabled && + e.executionId != e.rootExecutionId && + executionIdToSubExecutions.contains(e.rootExecutionId)) { + executionIdToSubExecutions(e.rootExecutionId) += e } else { - // When `completionTime` is present, it means the query execution is completed and - // `errorMessage` should be present as well. However, events generated by old versions of - // Spark do not have the `errorMessage` field. We have to check the status of this query - // execution's jobs. - val isFailed = e.jobs.exists { case (_, status) => status == JobExecutionStatus.FAILED } - if (isFailed) { - failed += e - } else { - completed += e + if (groupSubExecutionEnabled) { + // add the execution id to indicate it'll be displayed as root, so the executions with + // the same root execution id will be added here and displayed as sub execution. + // If the root execution is not found (e.g. event loss), then the sub executions will + // be displayed in the root list instead. + // NOTE: this code assumes the root execution id always comes first. which is guaranteed + // by the `sqlStore.executionsList()` + executionIdToSubExecutions(e.executionId) = new mutable.ArrayBuffer[SQLExecutionUIData]() } + processExecution(e) } } @@ -68,7 +91,8 @@ private[ui] class AllExecutionsPage(parent: SQLTab) extends WebUIPage("") with L if (running.nonEmpty) { val runningPageTable = - executionsTable(request, "running", running.toSeq, currentTime, true, true, true) + executionsTable(request, "running", running.toSeq, + executionIdToSubExecutions.mapValues(_.toSeq).toMap, currentTime, true, true, true) _content ++=
@@ -210,7 +238,9 @@ private[ui] class ExecutionPagedTable( currentTime: Long, showRunningJobs: Boolean, showSucceededJobs: Boolean, - showFailedJobs: Boolean) extends PagedTable[ExecutionTableRowData] { + showFailedJobs: Boolean, + subExecutions: Map[Long, Seq[SQLExecutionUIData]] = Map.empty) + extends PagedTable[ExecutionTableRowData] { private val (sortColumn, desc, pageSize) = getTableParameters(request, executionTag, "ID") @@ -224,11 +254,14 @@ private[ui] class ExecutionPagedTable( desc, showRunningJobs, showSucceededJobs, - showFailedJobs) + showFailedJobs, + subExecutions) private val parameterPath = s"$basePath/$subPath/?${getParameterOtherTable(request, executionTag)}" + private val showSubExecutions = subExecutions.nonEmpty + override def tableId: String = s"$executionTag-table" override def tableCssClass: String = @@ -250,32 +283,39 @@ private[ui] class ExecutionPagedTable( override def goButtonFormPath: String = s"$parameterPath&$executionTag.sort=$encodedSortColumn&$executionTag.desc=$desc#$tableHeaderId" - override def headers: Seq[Node] = { - // Information for each header: title, sortable, tooltip - val executionHeadersAndCssClasses: Seq[(String, Boolean, Option[String])] = - Seq( - ("ID", true, None), - ("Description", true, None), - ("Submitted", true, None), - ("Duration", true, Some("Time from query submission to completion (or if still executing," + - "time since submission)"))) ++ { - if (showRunningJobs && showSucceededJobs && showFailedJobs) { - Seq( - ("Running Job IDs", true, None), - ("Succeeded Job IDs", true, None), - ("Failed Job IDs", true, None)) - } else if (showSucceededJobs && showFailedJobs) { - Seq( - ("Succeeded Job IDs", true, None), - ("Failed Job IDs", true, None)) - } else { - Seq(("Job IDs", true, None)) - } + // Information for each header: title, sortable, tooltip + private val headerInfo: Seq[(String, Boolean, Option[String])] = { + Seq( + ("ID", true, None), + ("Description", true, None), + ("Submitted", true, None), + ("Duration", true, Some("Time from query submission to completion (or if still executing," + + " time since submission)"))) ++ { + if (showRunningJobs && showSucceededJobs && showFailedJobs) { + Seq( + ("Running Job IDs", true, None), + ("Succeeded Job IDs", true, None), + ("Failed Job IDs", true, None)) + } else if (showSucceededJobs && showFailedJobs) { + Seq( + ("Succeeded Job IDs", true, None), + ("Failed Job IDs", true, None)) + } else { + Seq(("Job IDs", true, None)) } + } ++ { + if (showSubExecutions) { + Seq(("Sub Execution IDs", true, None)) + } else { + Nil + } + } + } - isSortColumnValid(executionHeadersAndCssClasses, sortColumn) + override def headers: Seq[Node] = { + isSortColumnValid(headerInfo, sortColumn) - headerRow(executionHeadersAndCssClasses, desc, pageSize, sortColumn, parameterPath, + headerRow(headerInfo, desc, pageSize, sortColumn, parameterPath, executionTag, tableHeaderId) } @@ -290,35 +330,119 @@ private[ui] class ExecutionPagedTable( } } - - - {executionUIData.executionId.toString} - - - {descriptionCell(executionUIData)} - - - {UIUtils.formatDate(submissionTime)} - - - {UIUtils.formatDuration(duration)} - - {if (showRunningJobs) { + def executionLinks(executionData: Seq[Long]): Seq[Node] = { + val details = if (executionData.nonEmpty) { + val onClickScript = "this.parentNode.parentNode.nextElementSibling.nextElementSibling" + + ".classList.toggle('collapsed')" + + +details + + } else { + Nil + } + +
+ { + executionData.map { executionId => + [{executionId.toString}] + } + } +
++ details + } + + val baseRow: Seq[Node] = { + - {jobLinks(executionTableRow.runningJobData)} + {executionUIData.executionId.toString} - }} - {if (showSucceededJobs) { - {jobLinks(executionTableRow.completedJobData)} + {descriptionCell(executionUIData)} - }} - {if (showFailedJobs) { - - {jobLinks(executionTableRow.failedJobData)} + + {UIUtils.formatDate(submissionTime)} + + + {UIUtils.formatDuration(duration)} + + {if (showRunningJobs) { + + {jobLinks(executionTableRow.runningJobData)} + + }} + {if (showSucceededJobs) { + + {jobLinks(executionTableRow.completedJobData)} + + }} + {if (showFailedJobs) { + + {jobLinks(executionTableRow.failedJobData)} + + }} + {if (showSubExecutions) { + + {executionLinks(executionTableRow.subExecutionData.map(_.executionUIData.executionId))} + + }} + + } + + val subRow: Seq[Node] = if (executionTableRow.subExecutionData.nonEmpty) { + + + + + + + + {headerInfo.dropRight(1).map(info => )} + + + + { + executionTableRow.subExecutionData.map { rowData => + val executionUIData = rowData.executionUIData + val submissionTime = executionUIData.submissionTime + val duration = rowData.duration + + + + + + {if (showRunningJobs) { + + }} + {if (showSucceededJobs) { + + }} + {if (showFailedJobs) { + + }} + + } + } + +
{info._1}
+ {executionUIData.executionId.toString} + + {descriptionCell(executionUIData)} + + {UIUtils.formatDate(submissionTime)} + + {UIUtils.formatDuration(duration)} + + {jobLinks(rowData.runningJobData)} + + {jobLinks(rowData.completedJobData)} + + {jobLinks(rowData.failedJobData)} +
- }} - + + } else { + Nil + } + baseRow ++ subRow } private def descriptionCell(execution: SQLExecutionUIData): Seq[Node] = { @@ -358,7 +482,8 @@ private[ui] class ExecutionTableRowData( val executionUIData: SQLExecutionUIData, val runningJobData: Seq[Int], val completedJobData: Seq[Int], - val failedJobData: Seq[Int]) + val failedJobData: Seq[Int], + val subExecutionData: Seq[ExecutionTableRowData]) private[ui] class ExecutionDataSource( @@ -369,7 +494,9 @@ private[ui] class ExecutionDataSource( desc: Boolean, showRunningJobs: Boolean, showSucceededJobs: Boolean, - showFailedJobs: Boolean) extends PagedDataSource[ExecutionTableRowData](pageSize) { + showFailedJobs: Boolean, + subExecutions: Map[Long, Seq[SQLExecutionUIData]]) + extends PagedDataSource[ExecutionTableRowData](pageSize) { // Convert ExecutionData to ExecutionTableRowData which contains the final contents to show // in the table so that we can avoid creating duplicate contents during sorting the data @@ -401,12 +528,18 @@ private[ui] class ExecutionDataSource( }.map { case (jobId, _) => jobId }.toSeq.sorted } else Seq.empty + val executions = subExecutions.get(executionUIData.executionId) match { + case Some(executions) => executions.map(executionRow) + case _ => Seq.empty + } + new ExecutionTableRowData( duration, executionUIData, runningJobData, completedJobData, - failedJobData) + failedJobData, + executions) } /** Return Ordering according to sortColumn and desc. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index 64c652542ffa..32b215b1c2e8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -90,6 +90,7 @@ class SQLAppStatusListener( // data corresponding to the execId. val sqlStoreData = kvstore.read(classOf[SQLExecutionUIData], executionId) val executionData = new LiveExecutionData(executionId) + executionData.rootExecutionId = sqlStoreData.rootExecutionId executionData.description = sqlStoreData.description executionData.details = sqlStoreData.details executionData.physicalPlanDescription = sqlStoreData.physicalPlanDescription @@ -340,7 +341,7 @@ class SQLAppStatusListener( } private def onExecutionStart(event: SparkListenerSQLExecutionStart): Unit = { - val SparkListenerSQLExecutionStart(executionId, description, details, + val SparkListenerSQLExecutionStart(executionId, rootExecutionId, description, details, physicalPlanDescription, sparkPlanInfo, time, modifiedConfigs) = event val planGraph = SparkPlanGraph(sparkPlanInfo) @@ -355,6 +356,7 @@ class SQLAppStatusListener( kvstore.write(graphToStore) val exec = getOrCreateExecution(executionId) + exec.rootExecutionId = rootExecutionId exec.description = description exec.details = details exec.physicalPlanDescription = physicalPlanDescription @@ -483,6 +485,7 @@ class SQLAppStatusListener( private class LiveExecutionData(val executionId: Long) extends LiveEntity { + var rootExecutionId: Long = _ var description: String = null var details: String = null var physicalPlanDescription: String = null @@ -505,6 +508,7 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity { override protected def doUpdate(): Any = { new SQLExecutionUIData( executionId, + rootExecutionId, description, details, physicalPlanDescription, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala index 46827aaa1d94..6c92b98ca3d3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala @@ -83,6 +83,7 @@ class SQLAppStatusStore( class SQLExecutionUIData( @KVIndexParam val executionId: Long, + val rootExecutionId: Long, val description: String, val details: String, val physicalPlanDescription: String, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala index ce665e118938..b931b4fcde1b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala @@ -43,6 +43,8 @@ case class SparkListenerSQLAdaptiveSQLMetricUpdates( @DeveloperApi case class SparkListenerSQLExecutionStart( executionId: Long, + // if the execution is a root, then rootExecutionId == executionId + rootExecutionId: Long, description: String, details: String, physicalPlanDescription: String, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala index 8a258fb12197..5aa6ddbdb7af 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala @@ -17,12 +17,15 @@ package org.apache.spark.sql.execution.ui +import org.apache.spark.SparkConf import org.apache.spark.internal.Logging import org.apache.spark.ui.{SparkUI, SparkUITab} class SQLTab(val sqlStore: SQLAppStatusStore, sparkUI: SparkUI) extends SparkUITab(sparkUI, "SQL") with Logging { + def conf: SparkConf = sparkUI.conf + override val name = "SQL / DataFrame" val parent = sparkUI diff --git a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala index 09cef9663c01..1ccaf5c68c92 100644 --- a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala @@ -80,6 +80,7 @@ class SQLExecutionUIDataSerializer extends ProtobufSerDe { new SQLExecutionUIData( executionId = ui.getExecutionId, + rootExecutionId = ui.getRootExecutionId, description = ui.getDescription, details = ui.getDetails, physicalPlanDescription = ui.getPhysicalPlanDescription, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala index 9f2b08a65eaf..e9d98ee97157 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala @@ -59,7 +59,7 @@ class SQLJsonProtocolSuite extends SparkFunSuite with LocalSparkSession { val reconstructedEvent = JsonProtocol.sparkEventFromJson(SQLExecutionStartJsonString) if (newExecutionStartEvent) { - val expectedEvent = SparkListenerSQLExecutionStart(0, "test desc", "test detail", + val expectedEvent = SparkListenerSQLExecutionStart(0, 0, "test desc", "test detail", "test plan", new SparkPlanInfo("TestNode", "test string", Nil, Map(), Nil), 0, Map("k1" -> "v1")) assert(reconstructedEvent == expectedEvent) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLEventFilterBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLEventFilterBuilderSuite.scala index 090c149886a8..42b27bd9f28e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLEventFilterBuilderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLEventFilterBuilderSuite.scala @@ -57,7 +57,7 @@ class SQLEventFilterBuilderSuite extends SparkFunSuite { } // Start SQL Execution - listener.onOtherEvent(SparkListenerSQLExecutionStart(1, "desc1", "details1", "plan", + listener.onOtherEvent(SparkListenerSQLExecutionStart(1, 1, "desc1", "details1", "plan", new SparkPlanInfo("node", "str", Seq.empty, Map.empty, Seq.empty), time, Map.empty)) time += 1 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLLiveEntitiesEventFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLLiveEntitiesEventFilterSuite.scala index 724df8ebe8bf..f1b77e502dfe 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLLiveEntitiesEventFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLLiveEntitiesEventFilterSuite.scala @@ -41,7 +41,7 @@ class SQLLiveEntitiesEventFilterSuite extends SparkFunSuite { val acceptFn = filter.acceptFn().lift // Verifying with finished SQL execution 1 - assert(Some(false) === acceptFn(SparkListenerSQLExecutionStart(1, "description1", "details1", + assert(Some(false) === acceptFn(SparkListenerSQLExecutionStart(1, 1, "description1", "details1", "plan", null, 0, Map.empty))) assert(Some(false) === acceptFn(SparkListenerSQLExecutionEnd(1, 0))) assert(Some(false) === acceptFn(SparkListenerSQLAdaptiveExecutionUpdate(1, "plan", null))) @@ -88,7 +88,7 @@ class SQLLiveEntitiesEventFilterSuite extends SparkFunSuite { } // Verifying with live SQL execution 2 - assert(Some(true) === acceptFn(SparkListenerSQLExecutionStart(2, "description2", "details2", + assert(Some(true) === acceptFn(SparkListenerSQLExecutionStart(2, 2, "description2", "details2", "plan", null, 0, Map.empty))) assert(Some(true) === acceptFn(SparkListenerSQLExecutionEnd(2, 0))) assert(Some(true) === acceptFn(SparkListenerSQLAdaptiveExecutionUpdate(2, "plan", null))) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/AllExecutionsPageSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/AllExecutionsPageSuite.scala index a53687112602..7af58867f33a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/AllExecutionsPageSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/AllExecutionsPageSuite.scala @@ -28,6 +28,7 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.SparkConf import org.apache.spark.internal.config.Status.LIVE_UI_LOCAL_STORE_DIR +import org.apache.spark.internal.config.UI.UI_SQL_GROUP_SUB_EXECUTION_ENABLED import org.apache.spark.scheduler.{JobFailed, SparkListenerJobEnd, SparkListenerJobStart} import org.apache.spark.sql.DataFrame import org.apache.spark.sql.execution.{SparkPlanInfo, SQLExecution} @@ -60,6 +61,7 @@ abstract class AllExecutionsPageSuite extends SharedSparkSession with BeforeAndA when(tab.sqlStore).thenReturn(statusStore) val request = mock(classOf[HttpServletRequest]) + when(tab.conf).thenReturn(new SparkConf(false)) when(tab.appName).thenReturn("testing") when(tab.headerTabs).thenReturn(Seq.empty) @@ -74,6 +76,7 @@ abstract class AllExecutionsPageSuite extends SharedSparkSession with BeforeAndA when(tab.sqlStore).thenReturn(statusStore) val request = mock(classOf[HttpServletRequest]) + when(tab.conf).thenReturn(new SparkConf(false)) when(tab.appName).thenReturn("testing") when(tab.headerTabs).thenReturn(Seq.empty) @@ -82,6 +85,7 @@ abstract class AllExecutionsPageSuite extends SharedSparkSession with BeforeAndA val page = new AllExecutionsPage(tab) val df = createTestDataFrame listener.onOtherEvent(SparkListenerSQLExecutionStart( + 0, 0, "test", "test", @@ -109,6 +113,7 @@ abstract class AllExecutionsPageSuite extends SharedSparkSession with BeforeAndA val tab = mock(classOf[SQLTab], RETURNS_SMART_NULLS) val request = mock(classOf[HttpServletRequest]) + when(tab.conf).thenReturn(new SparkConf(false)) when(tab.sqlStore).thenReturn(statusStore) when(tab.appName).thenReturn("testing") when(tab.headerTabs).thenReturn(Seq.empty) @@ -121,6 +126,50 @@ abstract class AllExecutionsPageSuite extends SharedSparkSession with BeforeAndA assert(html.contains("duration")) } + test("group sub executions") { + val statusStore = createStatusStore + val tab = mock(classOf[SQLTab], RETURNS_SMART_NULLS) + val request = mock(classOf[HttpServletRequest]) + + val sparkConf = new SparkConf(false).set(UI_SQL_GROUP_SUB_EXECUTION_ENABLED, true) + when(tab.conf).thenReturn(sparkConf) + when(tab.sqlStore).thenReturn(statusStore) + when(tab.appName).thenReturn("testing") + when(tab.headerTabs).thenReturn(Seq.empty) + + val listener = statusStore.listener.get + val page = new AllExecutionsPage(tab) + val df = createTestDataFrame + listener.onOtherEvent(SparkListenerSQLExecutionStart( + 0, + 0, + "test", + "test", + df.queryExecution.toString, + SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), + System.currentTimeMillis())) + listener.onOtherEvent(SparkListenerSQLExecutionStart( + 1, + 0, + "test", + "test", + df.queryExecution.toString, + SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), + System.currentTimeMillis())) + // sub execution has a missing root execution + listener.onOtherEvent(SparkListenerSQLExecutionStart( + 2, + 100, + "test", + "test", + df.queryExecution.toString, + SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), + System.currentTimeMillis())) + val html = page.render(request).toString().toLowerCase(Locale.ROOT) + assert(html.contains("sub execution ids") && html.contains("sub-execution-list")) + // sub execution should still be displayed if the root execution is missing + assert(html.contains("id=2")) + } protected def createStatusStore: SQLAppStatusStore @@ -146,6 +195,7 @@ abstract class AllExecutionsPageSuite extends SharedSparkSession with BeforeAndA Seq(0, 1).foreach { executionId => val df = createTestDataFrame listener.onOtherEvent(SparkListenerSQLExecutionStart( + executionId, executionId, "test", "test", diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/MetricsAggregationBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/MetricsAggregationBenchmark.scala index d994126fe636..3b9efb180578 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/MetricsAggregationBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/MetricsAggregationBenchmark.scala @@ -74,6 +74,7 @@ object MetricsAggregationBenchmark extends BenchmarkBase { val idgen = new AtomicInteger() val executionId = idgen.incrementAndGet() val executionStart = SparkListenerSQLExecutionStart( + executionId, executionId, getClass().getName(), getClass().getName(), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala index 3d80935fdda6..26bb35cf0e53 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala @@ -190,6 +190,7 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes }.toMap listener.onOtherEvent(SparkListenerSQLExecutionStart( + executionId, executionId, "test", "test", @@ -343,7 +344,7 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes val listener = new SparkListener { override def onOtherEvent(event: SparkListenerEvent): Unit = { event match { - case SparkListenerSQLExecutionStart(_, _, _, planDescription, _, _, _) => + case SparkListenerSQLExecutionStart(_, _, _, _, planDescription, _, _, _) => assert(expected.forall(planDescription.contains)) checkDone = true case _ => // ignore other events @@ -380,6 +381,7 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes val executionId = 0 val df = createTestDataFrame listener.onOtherEvent(SparkListenerSQLExecutionStart( + executionId, executionId, "test", "test", @@ -410,6 +412,7 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes val executionId = 0 val df = createTestDataFrame listener.onOtherEvent(SparkListenerSQLExecutionStart( + executionId, executionId, "test", "test", @@ -451,6 +454,7 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes val executionId = 0 val df = createTestDataFrame listener.onOtherEvent(SparkListenerSQLExecutionStart( + executionId, executionId, "test", "test", @@ -481,6 +485,7 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes val executionId = 0 val df = createTestDataFrame listener.onOtherEvent(SparkListenerSQLExecutionStart( + executionId, executionId, "test", "test", @@ -512,6 +517,7 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes val executionId = 0 val df = createTestDataFrame listener.onOtherEvent(SparkListenerSQLExecutionStart( + executionId, executionId, "test", "test", @@ -652,6 +658,7 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes // Start execution 1 and execution 2 time += 1 listener.onOtherEvent(SparkListenerSQLExecutionStart( + 1, 1, "test", "test", @@ -661,6 +668,7 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes Map.empty)) time += 1 listener.onOtherEvent(SparkListenerSQLExecutionStart( + 2, 2, "test", "test", @@ -678,6 +686,7 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes // Start execution 3 and execution 2 should be evicted. time += 1 listener.onOtherEvent(SparkListenerSQLExecutionStart( + 3, 3, "test", "test", @@ -714,6 +723,7 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes .allNodes.flatMap(_.metrics.map(_.accumulatorId)) listener.onOtherEvent(SparkListenerSQLExecutionStart( + executionId, executionId, "test", "test", diff --git a/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala index f6fd1fc42ce3..a0154d724dad 100644 --- a/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala @@ -82,6 +82,7 @@ object SqlResourceSuite { new SQLExecutionUIData( executionId = 0, + rootExecutionId = 1, description = DESCRIPTION, details = "", physicalPlanDescription = PLAN_DESCRIPTION, diff --git a/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala b/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala index f7b783ef3ca1..ddc693f1ee3d 100644 --- a/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala @@ -52,6 +52,7 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite { val input1 = new SQLExecutionUIData( executionId = templateData.executionId, + rootExecutionId = templateData.rootExecutionId, description = templateData.description, details = templateData.details, physicalPlanDescription = templateData.physicalPlanDescription, @@ -71,6 +72,7 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite { val input2 = new SQLExecutionUIData( executionId = templateData.executionId, + rootExecutionId = templateData.rootExecutionId, description = templateData.description, details = templateData.details, physicalPlanDescription = templateData.physicalPlanDescription,