Skip to content

Commit 29d184c

Browse files
committed
[SPARK-22861][SQL] SQLAppStatusListener handles multi-job executions.
When one execution has multiple jobs, we need to append to the set of stages, not replace them on every job.
1 parent 9962390 commit 29d184c

File tree

2 files changed

+41
-2
lines changed

2 files changed

+41
-2
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ import org.apache.spark.sql.internal.StaticSQLConf._
3131
import org.apache.spark.status.{ElementTrackingStore, KVUtils, LiveEntity}
3232
import org.apache.spark.status.config._
3333
import org.apache.spark.ui.SparkUI
34-
import org.apache.spark.util.kvstore.KVStore
3534

3635
private[sql] class SQLAppStatusListener(
3736
conf: SparkConf,
@@ -88,7 +87,7 @@ private[sql] class SQLAppStatusListener(
8887
}
8988

9089
exec.jobs = exec.jobs + (jobId -> JobExecutionStatus.RUNNING)
91-
exec.stages = event.stageIds.toSet
90+
exec.stages ++= event.stageIds.toSet
9291
update(exec)
9392
}
9493

sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,46 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest
361361
assertJobs(store.execution(0), failed = Seq(0))
362362
}
363363

364+
sqlStoreTest("handle one execution with multiple jobs") { (store, bus) =>
365+
val executionId = 0
366+
val df = createTestDataFrame
367+
bus.postToAll(SparkListenerSQLExecutionStart(
368+
executionId,
369+
"test",
370+
"test",
371+
df.queryExecution.toString,
372+
SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
373+
System.currentTimeMillis()))
374+
375+
var stageId = 0
376+
def twoStageJob(jobId: Int): Unit = {
377+
val stages = Seq(stageId, stageId + 1).map { id => createStageInfo(id, 0)}
378+
stageId += 2
379+
bus.postToAll(SparkListenerJobStart(
380+
jobId = jobId,
381+
time = System.currentTimeMillis(),
382+
stageInfos = stages,
383+
createProperties(executionId)))
384+
stages.foreach { s =>
385+
bus.postToAll(SparkListenerStageSubmitted(s))
386+
bus.postToAll(SparkListenerStageCompleted(s))
387+
}
388+
bus.postToAll(SparkListenerJobEnd(
389+
jobId = jobId,
390+
time = System.currentTimeMillis(),
391+
JobSucceeded
392+
))
393+
}
394+
// submit two jobs with the same executionId
395+
twoStageJob(0)
396+
twoStageJob(1)
397+
bus.postToAll(SparkListenerSQLExecutionEnd(
398+
executionId, System.currentTimeMillis()))
399+
400+
assertJobs(store.execution(0), completed = 0 to 1)
401+
assert(store.execution(0).get.stages === (0 to 3).toSet)
402+
}
403+
364404
test("SPARK-11126: no memory leak when running non SQL jobs") {
365405
val previousStageNumber = statusStore.executionsList().size
366406
spark.sparkContext.parallelize(1 to 10).foreach(i => ())

0 commit comments

Comments
 (0)