Skip to content

Commit 0745dae

Browse files
ulysses-youcloud-fan
authored andcommitted
[SPARK-40834][SQL] Use SparkListenerSQLExecutionEnd to track final SQL status in UI
### What changes were proposed in this pull request? Use event `SparkListenerSQLExecutionEnd` to track if the SQL/DataFrame is completion instead of using job status. ### Why are the changes needed? The SQL may succeed with some failed jobs. For example, a inner join with one empty side and one large side, the plan would finish and the large side is still running. ### Does this PR introduce _any_ user-facing change? yes, correct the sql status in UI ### How was this patch tested? add test for backward compatibility and manually test ```sql CREATE TABLE t1 (c1 int) USING PARQUET; CREATE TABLE t2 USING PARQUET AS SELECT 1 AS c2; ``` ```bash ./bin/spark-sql -e "SELECT /*+ merge(tmp) */ * FROM t1 JOIN (SELECT c2, java_method('java.lang.Thread', 'sleep', 10000L) FROM t2) tmp ON c1 = c2;" ``` before: <img width="1712" alt="image" src="https://user-images.githubusercontent.com/12025282/196576790-7e4eeb29-024f-4ac3-bdec-f4e894448b57.png"> after: <img width="1709" alt="image" src="https://user-images.githubusercontent.com/12025282/196576674-15d80366-bd42-417b-80bf-eeec0b1ef046.png"> Closes apache#38302 from ulysses-you/sql-end. Authored-by: ulysses-you <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 5a2da01 commit 0745dae

File tree

10 files changed

+107
-19
lines changed

10 files changed

+107
-19
lines changed

core/src/main/scala/org/apache/spark/SparkThrowableHelper.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ package org.apache.spark
1919

2020
import scala.collection.JavaConverters._
2121

22+
import com.fasterxml.jackson.core.util.MinimalPrettyPrinter
23+
2224
import org.apache.spark.util.JsonProtocol.toJsonString
2325
import org.apache.spark.util.Utils
2426

@@ -119,4 +121,16 @@ private[spark] object SparkThrowableHelper {
119121
}
120122
}
121123
}
124+
125+
def getMessage(throwable: Throwable): String = {
126+
toJsonString { generator =>
127+
val g = generator.setPrettyPrinter(new MinimalPrettyPrinter)
128+
g.writeStartObject()
129+
g.writeStringField("errorClass", throwable.getClass.getCanonicalName)
130+
g.writeObjectFieldStart("messageParameters")
131+
g.writeStringField("message", throwable.getMessage)
132+
g.writeEndObject()
133+
g.writeEndObject()
134+
}
135+
}
122136
}

sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution
2020
import java.util.concurrent.{ConcurrentHashMap, ExecutorService, Future => JFuture}
2121
import java.util.concurrent.atomic.AtomicLong
2222

23-
import org.apache.spark.SparkContext
23+
import org.apache.spark.{ErrorMessageFormat, SparkContext, SparkThrowable, SparkThrowableHelper}
2424
import org.apache.spark.internal.config.Tests.IS_TESTING
2525
import org.apache.spark.sql.SparkSession
2626
import org.apache.spark.sql.execution.ui.{SparkListenerSQLExecutionEnd, SparkListenerSQLExecutionStart}
@@ -115,7 +115,15 @@ object SQLExecution {
115115
throw e
116116
} finally {
117117
val endTime = System.nanoTime()
118-
val event = SparkListenerSQLExecutionEnd(executionId, System.currentTimeMillis())
118+
val errorMessage = ex.map {
119+
case e: SparkThrowable =>
120+
SparkThrowableHelper.getMessage(e, ErrorMessageFormat.STANDARD)
121+
case e =>
122+
// unexpected behavior
123+
SparkThrowableHelper.getMessage(e)
124+
}
125+
val event = SparkListenerSQLExecutionEnd(
126+
executionId, System.currentTimeMillis(), errorMessage)
119127
// Currently only `Dataset.withAction` and `DataFrameWriter.runCommand` specify the `name`
120128
// parameter. The `ExecutionListenerManager` only watches SQL executions with name. We
121129
// can specify the execution name in more places in the future, so that

sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,15 +41,12 @@ private[ui] class AllExecutionsPage(parent: SQLTab) extends WebUIPage("") with L
4141
val failed = new mutable.ArrayBuffer[SQLExecutionUIData]()
4242

4343
sqlStore.executionsList().foreach { e =>
44-
val isRunning = e.completionTime.isEmpty ||
45-
e.jobs.exists { case (_, status) => status == JobExecutionStatus.RUNNING }
46-
val isFailed = e.jobs.exists { case (_, status) => status == JobExecutionStatus.FAILED }
47-
if (isRunning) {
48-
running += e
49-
} else if (isFailed) {
44+
if (e.errorMessage.isDefined) {
5045
failed += e
51-
} else {
46+
} else if (e.completionTime.nonEmpty) {
5247
completed += e
48+
} else {
49+
running += e
5350
}
5451
}
5552

sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -393,9 +393,10 @@ class SQLAppStatusListener(
393393
}
394394

395395
private def onExecutionEnd(event: SparkListenerSQLExecutionEnd): Unit = {
396-
val SparkListenerSQLExecutionEnd(executionId, time) = event
396+
val SparkListenerSQLExecutionEnd(executionId, time, errorMessage) = event
397397
Option(liveExecutions.get(executionId)).foreach { exec =>
398398
exec.completionTime = Some(new Date(time))
399+
exec.errorMessage = errorMessage
399400
update(exec)
400401

401402
// Aggregating metrics can be expensive for large queries, so do it asynchronously. The end
@@ -494,6 +495,7 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity {
494495
var driverAccumUpdates = Seq[(Long, Long)]()
495496

496497
@volatile var metricsValues: Map[Long, String] = null
498+
var errorMessage: Option[String] = None
497499

498500
// Just in case job end and execution end arrive out of order, keep track of how many
499501
// end events arrived so that the listener can stop tracking the execution.
@@ -511,7 +513,8 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity {
511513
completionTime,
512514
jobs,
513515
stages,
514-
metricsValues)
516+
metricsValues,
517+
errorMessage)
515518
}
516519

517520
}

sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,8 @@ class SQLExecutionUIData(
100100
* from the SQL listener instance.
101101
*/
102102
@JsonDeserialize(keyAs = classOf[JLong])
103-
val metricValues: Map[Long, String]) {
103+
val metricValues: Map[Long, String],
104+
val errorMessage: Option[String]) {
104105

105106
@JsonIgnore @KVIndex("completionTime")
106107
private def completionTimeIndex: Long = completionTime.map(_.getTime).getOrElse(-1L)

sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,10 @@ case class SparkListenerSQLExecutionStart(
5656
}
5757

5858
@DeveloperApi
59-
case class SparkListenerSQLExecutionEnd(executionId: Long, time: Long)
59+
case class SparkListenerSQLExecutionEnd(
60+
executionId: Long,
61+
time: Long,
62+
errorMessage: Option[String] = None)
6063
extends SparkListenerEvent {
6164

6265
// The name of the execution, e.g. `df.collect` will trigger a SQL execution with name "collect".

sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution
1919

2020
import org.json4s.jackson.JsonMethods._
2121

22-
import org.apache.spark.SparkFunSuite
22+
import org.apache.spark.{SparkFunSuite, SparkThrowableHelper}
2323
import org.apache.spark.scheduler.SparkListenerEvent
2424
import org.apache.spark.sql.LocalSparkSession
2525
import org.apache.spark.sql.execution.ui.{SparkListenerSQLExecutionEnd, SparkListenerSQLExecutionStart}
@@ -74,27 +74,60 @@ class SQLJsonProtocolSuite extends SparkFunSuite with LocalSparkSession {
7474
test("SparkListenerSQLExecutionEnd backward compatibility") {
7575
spark = new TestSparkSession()
7676
val qe = spark.sql("select 1").queryExecution
77-
val event = SparkListenerSQLExecutionEnd(1, 10)
77+
val errorMessage = SparkThrowableHelper.getMessage(new Exception("test"))
78+
val event = SparkListenerSQLExecutionEnd(1, 10, Some(errorMessage))
7879
event.duration = 1000
7980
event.executionName = Some("test")
8081
event.qe = qe
81-
event.executionFailure = Some(new RuntimeException("test"))
82+
event.executionFailure = Some(new Exception("test"))
8283
val json = JsonProtocol.sparkEventToJsonString(event)
84+
// scalastyle:off line.size.limit
8385
assert(parse(json) == parse(
8486
"""
8587
|{
8688
| "Event" : "org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd",
8789
| "executionId" : 1,
88-
| "time" : 10
90+
| "time" : 10,
91+
| "errorMessage" : "{\"errorClass\":\"java.lang.Exception\",\"messageParameters\":{\"message\":\"test\"}}"
8992
|}
9093
""".stripMargin))
94+
// scalastyle:on
9195
val readBack = JsonProtocol.sparkEventFromJson(json)
9296
event.duration = 0
9397
event.executionName = None
9498
event.qe = null
9599
event.executionFailure = None
96100
assert(readBack == event)
97101
}
102+
103+
test("SPARK-40834: Use SparkListenerSQLExecutionEnd to track final SQL status in UI") {
104+
// parse old event log using new SparkListenerSQLExecutionEnd
105+
val executionEnd =
106+
"""
107+
|{
108+
| "Event" : "org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd",
109+
| "executionId" : 1,
110+
| "time" : 10
111+
|}
112+
""".stripMargin
113+
val readBack = JsonProtocol.sparkEventFromJson(executionEnd)
114+
assert(readBack == SparkListenerSQLExecutionEnd(1, 10))
115+
116+
// parse new event using old SparkListenerSQLExecutionEnd
117+
// scalastyle:off line.size.limit
118+
val newExecutionEnd =
119+
"""
120+
|{
121+
| "Event" : "org.apache.spark.sql.execution.OldVersionSQLExecutionEnd",
122+
| "executionId" : 1,
123+
| "time" : 10,
124+
| "errorMessage" : "{\"errorClass\":\"java.lang.Exception\",\"messageParameters\":{\"message\":\"test\"}}"
125+
|}
126+
""".stripMargin
127+
// scalastyle:on
128+
val readBack2 = JsonProtocol.sparkEventFromJson(newExecutionEnd)
129+
assert(readBack2 == OldVersionSQLExecutionEnd(1, 10))
130+
}
98131
}
99132

100133
private case class OldVersionSQLExecutionStart(
@@ -105,3 +138,6 @@ private case class OldVersionSQLExecutionStart(
105138
sparkPlanInfo: SparkPlanInfo,
106139
time: Long)
107140
extends SparkListenerEvent
141+
142+
private case class OldVersionSQLExecutionEnd(executionId: Long, time: Long)
143+
extends SparkListenerEvent

sql/core/src/test/scala/org/apache/spark/sql/execution/ui/AllExecutionsPageSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ class AllExecutionsPageSuite extends SharedSparkSession with BeforeAndAfter {
115115
System.currentTimeMillis(),
116116
Map.empty))
117117
listener.onOtherEvent(SparkListenerSQLExecutionEnd(
118-
executionId, System.currentTimeMillis()))
118+
executionId, System.currentTimeMillis(), Some("Oops")))
119119
listener.onJobStart(SparkListenerJobStart(
120120
jobId = 0,
121121
time = System.currentTimeMillis(),

sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import java.util.Properties
2121

2222
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
2323

24+
import com.fasterxml.jackson.databind.ObjectMapper
2425
import org.apache.hadoop.conf.Configuration
2526
import org.apache.hadoop.fs.{FileSystem, Path}
2627
import org.json4s.jackson.JsonMethods._
@@ -980,6 +981,30 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils
980981
}
981982
}
982983
}
984+
985+
test("SPARK-40834: Use SparkListenerSQLExecutionEnd to track final SQL status in UI") {
986+
var received = false
987+
spark.sparkContext.addSparkListener(new SparkListener {
988+
override def onOtherEvent(event: SparkListenerEvent): Unit = {
989+
event match {
990+
case SparkListenerSQLExecutionEnd(_, _, Some(errorMessage)) =>
991+
val error = new ObjectMapper().readTree(errorMessage)
992+
assert(error.get("errorClass").toPrettyString === "\"java.lang.Exception\"")
993+
assert(error.path("messageParameters").get("message").toPrettyString === "\"test\"")
994+
received = true
995+
case _ =>
996+
}
997+
}
998+
})
999+
1000+
intercept[Exception] {
1001+
SQLExecution.withNewExecutionId(spark.range(1).queryExecution) {
1002+
throw new Exception("test")
1003+
}
1004+
}
1005+
spark.sparkContext.listenerBus.waitUntilEmpty(10000)
1006+
assert(received)
1007+
}
9831008
}
9841009

9851010

sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,8 @@ object SqlResourceSuite {
9393
0 -> JobExecutionStatus.SUCCEEDED,
9494
1 -> JobExecutionStatus.SUCCEEDED),
9595
stages = Set[Int](),
96-
metricValues = getMetricValues()
96+
metricValues = getMetricValues(),
97+
errorMessage = None
9798
)
9899
}
99100

0 commit comments

Comments
 (0)