Skip to content

Commit 52641db

Browse files
author
Marcelo Vanzin
committed
[SPARK-22836][ui] Show driver logs in UI when available.
Port code from the old executors listener to the new one, so that the driver logs present in the application start event are kept.
1 parent 7570eab commit 52641db

File tree

2 files changed

+29
-0
lines changed

2 files changed

+29
-0
lines changed

core/src/main/scala/org/apache/spark/status/AppStatusListener.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,17 @@ private[spark] class AppStatusListener(
114114

115115
kvstore.write(new ApplicationInfoWrapper(appInfo))
116116
kvstore.write(appSummary)
117+
118+
// Update the driver block manager with logs from this event. The SparkContext initialization
119+
// code registers the driver before this event is sent.
120+
event.driverLogs.foreach { logs =>
121+
val driver = liveExecutors.get(SparkContext.DRIVER_IDENTIFIER)
122+
.orElse(liveExecutors.get(SparkContext.LEGACY_DRIVER_IDENTIFIER))
123+
driver.foreach { d =>
124+
d.executorLogs = logs.toMap
125+
update(d, System.nanoTime())
126+
}
127+
}
117128
}
118129

119130
override def onEnvironmentUpdate(event: SparkListenerEnvironmentUpdate): Unit = {

core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -939,6 +939,24 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
939939
}
940940
}
941941

942+
test("driver logs") {
943+
val listener = new AppStatusListener(store, conf, true)
944+
945+
val driver = BlockManagerId(SparkContext.DRIVER_IDENTIFIER, "localhost", 42)
946+
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(time, driver, 42L))
947+
listener.onApplicationStart(SparkListenerApplicationStart(
948+
"name",
949+
Some("id"),
950+
time,
951+
"user",
952+
Some("attempt"),
953+
Some(Map("stdout" -> "file.txt"))))
954+
955+
check[ExecutorSummaryWrapper](SparkContext.DRIVER_IDENTIFIER) { d =>
956+
assert(d.info.executorLogs("stdout") === "file.txt")
957+
}
958+
}
959+
942960
private def key(stage: StageInfo): Array[Int] = Array(stage.stageId, stage.attemptId)
943961

944962
private def check[T: ClassTag](key: Any)(fn: T => Unit): Unit = {

0 commit comments

Comments
 (0)