Skip to content

Commit 9c21ece

Browse files
Marcelo Vanzinsquito
authored andcommitted
[SPARK-22836][UI] Show driver logs in UI when available.
Port code from the old executors listener to the new one, so that the driver logs present in the application start event are kept. Author: Marcelo Vanzin <[email protected]> Closes #20038 from vanzin/SPARK-22836.
1 parent 8f6d573 commit 9c21ece

File tree

2 files changed

+29
-0
lines changed

2 files changed

+29
-0
lines changed

core/src/main/scala/org/apache/spark/status/AppStatusListener.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,17 @@ private[spark] class AppStatusListener(
119119

120120
kvstore.write(new ApplicationInfoWrapper(appInfo))
121121
kvstore.write(appSummary)
122+
123+
// Update the driver block manager with logs from this event. The SparkContext initialization
124+
// code registers the driver before this event is sent.
125+
event.driverLogs.foreach { logs =>
126+
val driver = liveExecutors.get(SparkContext.DRIVER_IDENTIFIER)
127+
.orElse(liveExecutors.get(SparkContext.LEGACY_DRIVER_IDENTIFIER))
128+
driver.foreach { d =>
129+
d.executorLogs = logs.toMap
130+
update(d, System.nanoTime())
131+
}
132+
}
122133
}
123134

124135
override def onEnvironmentUpdate(event: SparkListenerEnvironmentUpdate): Unit = {

core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -942,6 +942,24 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
942942
}
943943
}
944944

945+
test("driver logs") {
946+
val listener = new AppStatusListener(store, conf, true)
947+
948+
val driver = BlockManagerId(SparkContext.DRIVER_IDENTIFIER, "localhost", 42)
949+
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(time, driver, 42L))
950+
listener.onApplicationStart(SparkListenerApplicationStart(
951+
"name",
952+
Some("id"),
953+
time,
954+
"user",
955+
Some("attempt"),
956+
Some(Map("stdout" -> "file.txt"))))
957+
958+
check[ExecutorSummaryWrapper](SparkContext.DRIVER_IDENTIFIER) { d =>
959+
assert(d.info.executorLogs("stdout") === "file.txt")
960+
}
961+
}
962+
945963
private def key(stage: StageInfo): Array[Int] = Array(stage.stageId, stage.attemptId)
946964

947965
private def check[T: ClassTag](key: Any)(fn: T => Unit): Unit = {

0 commit comments

Comments
 (0)