Skip to content

Commit 956c4c9

Browse files
harishreedharansquito
authored andcommitted
[SPARK-7657] [YARN] Add driver logs links in application UI, in cluster mode.
This PR adds the URLs to the driver logs to `SparkListenerApplicationStarted` event, which is later used by the `ExecutorsListener` to populate the URLs to the driver logs in its own state. This info is then used when the UI is rendered to display links to the logs. Author: Hari Shreedharan <[email protected]> Closes #6166 from harishreedharan/am-log-link and squashes the following commits: 943fc4f [Hari Shreedharan] Merge remote-tracking branch 'asf/master' into am-log-link 9e5c04b [Hari Shreedharan] Merge remote-tracking branch 'asf/master' into am-log-link b3f9b9d [Hari Shreedharan] Updated comment based on feedback. 0840a95 [Hari Shreedharan] Move the result and sc.stop back to original location, minor import changes. 537a2f7 [Hari Shreedharan] Add test to ensure the log urls are populated and valid. 4033725 [Hari Shreedharan] Adding comments explaining how node reports are used to get the log urls. 6c5c285 [Hari Shreedharan] Import order. 346f4ea [Hari Shreedharan] Review feedback fixes. 629c1dc [Hari Shreedharan] Cleanup. 99fb1a3 [Hari Shreedharan] Send the log urls in App start event, to ensure that other listeners are not affected. c0de336 [Hari Shreedharan] Ensure new unit test cleans up after itself. 50cdae3 [Hari Shreedharan] Added unit test, made the approach generic. 402e8e4 [Hari Shreedharan] Use `NodeReport` to get the URL for the logs. Also, make the environment variables generic so other cluster managers can use them as well. 1cf338f [Hari Shreedharan] [SPARK-7657][YARN] Add driver link in application UI, in cluster mode.
1 parent 85b9637 commit 956c4c9

File tree

9 files changed

+136
-12
lines changed

9 files changed

+136
-12
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1991,7 +1991,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
19911991
// Note: this code assumes that the task scheduler has been initialized and has contacted
19921992
// the cluster manager to get an application ID (in case the cluster manager provides one).
19931993
listenerBus.post(SparkListenerApplicationStart(appName, Some(applicationId),
1994-
startTime, sparkUser, applicationAttemptId))
1994+
startTime, sparkUser, applicationAttemptId, schedulerBackend.getDriverLogUrls))
19951995
}
19961996

19971997
/** Post the application end event */

core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,4 +49,11 @@ private[spark] trait SchedulerBackend {
4949
*/
5050
def applicationAttemptId(): Option[String] = None
5151

52+
/**
53+
* Get the URLs for the driver logs. These URLs are used to display the links in the UI
54+
* Executors tab for the driver.
55+
* @return Map containing the log names and their respective URLs
56+
*/
57+
def getDriverLogUrls: Option[Map[String, String]] = None
58+
5259
}

core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,8 +110,13 @@ case class SparkListenerExecutorMetricsUpdate(
110110
extends SparkListenerEvent
111111

112112
@DeveloperApi
113-
case class SparkListenerApplicationStart(appName: String, appId: Option[String],
114-
time: Long, sparkUser: String, appAttemptId: Option[String]) extends SparkListenerEvent
113+
case class SparkListenerApplicationStart(
114+
appName: String,
115+
appId: Option[String],
116+
time: Long,
117+
sparkUser: String,
118+
appAttemptId: Option[String],
119+
driverLogs: Option[Map[String, String]] = None) extends SparkListenerEvent
115120

116121
@DeveloperApi
117122
case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent

core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.ui.exec
1919

2020
import scala.collection.mutable.HashMap
2121

22-
import org.apache.spark.ExceptionFailure
22+
import org.apache.spark.{ExceptionFailure, SparkContext}
2323
import org.apache.spark.annotation.DeveloperApi
2424
import org.apache.spark.scheduler._
2525
import org.apache.spark.storage.{StorageStatus, StorageStatusListener}
@@ -73,6 +73,16 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp
7373
uiData.finishReason = Some(executorRemoved.reason)
7474
}
7575

76+
override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = {
77+
applicationStart.driverLogs.foreach { logs =>
78+
val storageStatus = storageStatusList.find { s =>
79+
s.blockManagerId.executorId == SparkContext.LEGACY_DRIVER_IDENTIFIER ||
80+
s.blockManagerId.executorId == SparkContext.DRIVER_IDENTIFIER
81+
}
82+
storageStatus.foreach { s => executorToLogUrls(s.blockManagerId.executorId) = logs.toMap }
83+
}
84+
}
85+
7686
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = synchronized {
7787
val eid = taskStart.taskInfo.executorId
7888
executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 0) + 1

core/src/main/scala/org/apache/spark/util/JsonProtocol.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,8 @@ private[spark] object JsonProtocol {
196196
("App ID" -> applicationStart.appId.map(JString(_)).getOrElse(JNothing)) ~
197197
("Timestamp" -> applicationStart.time) ~
198198
("User" -> applicationStart.sparkUser) ~
199-
("App Attempt ID" -> applicationStart.appAttemptId.map(JString(_)).getOrElse(JNothing))
199+
("App Attempt ID" -> applicationStart.appAttemptId.map(JString(_)).getOrElse(JNothing)) ~
200+
("Driver Logs" -> applicationStart.driverLogs.map(mapToJson).getOrElse(JNothing))
200201
}
201202

202203
def applicationEndToJson(applicationEnd: SparkListenerApplicationEnd): JValue = {
@@ -570,7 +571,8 @@ private[spark] object JsonProtocol {
570571
val time = (json \ "Timestamp").extract[Long]
571572
val sparkUser = (json \ "User").extract[String]
572573
val appAttemptId = Utils.jsonOption(json \ "App Attempt ID").map(_.extract[String])
573-
SparkListenerApplicationStart(appName, appId, time, sparkUser, appAttemptId)
574+
val driverLogs = Utils.jsonOption(json \ "Driver Logs").map(mapFromJson)
575+
SparkListenerApplicationStart(appName, appId, time, sparkUser, appAttemptId, driverLogs)
574576
}
575577

576578
def applicationEndFromJson(json: JValue): SparkListenerApplicationEnd = {

yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,9 +89,7 @@ private[spark] class YarnRMClient(args: ApplicationMasterArguments) extends Logg
8989

9090
/** Returns the attempt ID. */
9191
def getAttemptId(): ApplicationAttemptId = {
92-
val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name())
93-
val containerId = ConverterUtils.toContainerId(containerIdString)
94-
containerId.getApplicationAttemptId()
92+
YarnSparkHadoopUtil.get.getContainerId.getApplicationAttemptId()
9593
}
9694

9795
/** Returns the configuration for the AmIpFilter to add to the Spark UI. */

yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ import org.apache.hadoop.security.UserGroupInformation
3333
import org.apache.hadoop.yarn.conf.YarnConfiguration
3434
import org.apache.hadoop.yarn.api.ApplicationConstants
3535
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
36-
import org.apache.hadoop.yarn.api.records.{Priority, ApplicationAccessType}
36+
import org.apache.hadoop.yarn.api.records.{ApplicationAccessType, ContainerId, Priority}
37+
import org.apache.hadoop.yarn.util.ConverterUtils
3738

3839
import org.apache.spark.deploy.SparkHadoopUtil
3940
import org.apache.spark.{SecurityManager, SparkConf, SparkException}
@@ -136,6 +137,10 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
136137
tokenRenewer.foreach(_.stop())
137138
}
138139

140+
private[spark] def getContainerId: ContainerId = {
141+
val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name())
142+
ConverterUtils.toContainerId(containerIdString)
143+
}
139144
}
140145

141146
object YarnSparkHadoopUtil {

yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala

Lines changed: 76 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,19 @@
1717

1818
package org.apache.spark.scheduler.cluster
1919

20+
import java.net.NetworkInterface
21+
22+
import scala.collection.JavaConverters._
23+
24+
import org.apache.hadoop.yarn.api.records.NodeState
25+
import org.apache.hadoop.yarn.client.api.YarnClient
26+
import org.apache.hadoop.yarn.conf.YarnConfiguration
27+
2028
import org.apache.spark.SparkContext
29+
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
2130
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
2231
import org.apache.spark.scheduler.TaskSchedulerImpl
23-
import org.apache.spark.util.IntParam
32+
import org.apache.spark.util.{IntParam, Utils}
2433

2534
private[spark] class YarnClusterSchedulerBackend(
2635
scheduler: TaskSchedulerImpl,
@@ -53,4 +62,70 @@ private[spark] class YarnClusterSchedulerBackend(
5362
logError("Application attempt ID is not set.")
5463
super.applicationAttemptId
5564
}
65+
66+
override def getDriverLogUrls: Option[Map[String, String]] = {
67+
var yarnClientOpt: Option[YarnClient] = None
68+
var driverLogs: Option[Map[String, String]] = None
69+
try {
70+
val yarnConf = new YarnConfiguration(sc.hadoopConfiguration)
71+
val containerId = YarnSparkHadoopUtil.get.getContainerId
72+
yarnClientOpt = Some(YarnClient.createYarnClient())
73+
yarnClientOpt.foreach { yarnClient =>
74+
yarnClient.init(yarnConf)
75+
yarnClient.start()
76+
77+
// For newer versions of YARN, we can find the HTTP address for a given node by getting a
78+
// container report for a given container. But container reports came only in Hadoop 2.4,
79+
// so we basically have to get the node reports for all nodes and find the one which runs
80+
// this container. For that we have to compare the node's host against the current host.
81+
// Since the host can have multiple addresses, we need to compare against all of them to
82+
// find out if one matches.
83+
84+
// Get all the addresses of this node.
85+
val addresses =
86+
NetworkInterface.getNetworkInterfaces.asScala
87+
.flatMap(_.getInetAddresses.asScala)
88+
.toSeq
89+
90+
// Find a node report that matches one of the addresses
91+
val nodeReport =
92+
yarnClient.getNodeReports(NodeState.RUNNING).asScala.find { x =>
93+
val host = x.getNodeId.getHost
94+
addresses.exists { address =>
95+
address.getHostAddress == host ||
96+
address.getHostName == host ||
97+
address.getCanonicalHostName == host
98+
}
99+
}
100+
101+
// Now that we have found the report for the Node Manager that the AM is running on, we
102+
// can get the base HTTP address for the Node manager from the report.
103+
// The format used for the logs for each container is well-known and can be constructed
104+
// using the NM's HTTP address and the container ID.
105+
// The NM may be running several containers, but we can build the URL for the AM using
106+
// the AM's container ID, which we already know.
107+
nodeReport.foreach { report =>
108+
val httpAddress = report.getHttpAddress
109+
// lookup appropriate http scheme for container log urls
110+
val yarnHttpPolicy = yarnConf.get(
111+
YarnConfiguration.YARN_HTTP_POLICY_KEY,
112+
YarnConfiguration.YARN_HTTP_POLICY_DEFAULT
113+
)
114+
val user = Utils.getCurrentUserName()
115+
val httpScheme = if (yarnHttpPolicy == "HTTPS_ONLY") "https://" else "http://"
116+
val baseUrl = s"$httpScheme$httpAddress/node/containerlogs/$containerId/$user"
117+
logDebug(s"Base URL for logs: $baseUrl")
118+
driverLogs = Some(
119+
Map("stderr" -> s"$baseUrl/stderr?start=0", "stdout" -> s"$baseUrl/stdout?start=0"))
120+
}
121+
}
122+
} catch {
123+
case e: Exception =>
124+
logInfo("Node Report API is not available in the version of YARN being used, so AM" +
125+
" logs link will not appear in application UI", e)
126+
} finally {
127+
yarnClientOpt.foreach(_.close())
128+
}
129+
driverLogs
130+
}
56131
}

yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit
2323

2424
import scala.collection.JavaConversions._
2525
import scala.collection.mutable
26+
import scala.io.Source
2627

2728
import com.google.common.base.Charsets.UTF_8
2829
import com.google.common.io.ByteStreams
@@ -33,7 +34,8 @@ import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
3334

3435
import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException, TestUtils}
3536
import org.apache.spark.scheduler.cluster.ExecutorInfo
36-
import org.apache.spark.scheduler.{SparkListenerJobStart, SparkListener, SparkListenerExecutorAdded}
37+
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationStart,
38+
SparkListenerExecutorAdded}
3739
import org.apache.spark.util.Utils
3840

3941
/**
@@ -290,10 +292,15 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
290292

291293
private[spark] class SaveExecutorInfo extends SparkListener {
292294
val addedExecutorInfos = mutable.Map[String, ExecutorInfo]()
295+
var driverLogs: Option[collection.Map[String, String]] = None
293296

294297
override def onExecutorAdded(executor: SparkListenerExecutorAdded) {
295298
addedExecutorInfos(executor.executorId) = executor.executorInfo
296299
}
300+
301+
override def onApplicationStart(appStart: SparkListenerApplicationStart): Unit = {
302+
driverLogs = appStart.driverLogs
303+
}
297304
}
298305

299306
private object YarnClusterDriver extends Logging with Matchers {
@@ -314,6 +321,7 @@ private object YarnClusterDriver extends Logging with Matchers {
314321
val sc = new SparkContext(new SparkConf()
315322
.set("spark.extraListeners", classOf[SaveExecutorInfo].getName)
316323
.setAppName("yarn \"test app\" 'with quotes' and \\back\\slashes and $dollarSigns"))
324+
val conf = sc.getConf
317325
val status = new File(args(0))
318326
var result = "failure"
319327
try {
@@ -335,6 +343,20 @@ private object YarnClusterDriver extends Logging with Matchers {
335343
executorInfos.foreach { info =>
336344
assert(info.logUrlMap.nonEmpty)
337345
}
346+
347+
// If we are running in yarn-cluster mode, verify that driver logs are downloadable.
348+
if (conf.get("spark.master") == "yarn-cluster") {
349+
assert(listener.driverLogs.nonEmpty)
350+
val driverLogs = listener.driverLogs.get
351+
assert(driverLogs.size === 2)
352+
assert(driverLogs.containsKey("stderr"))
353+
assert(driverLogs.containsKey("stdout"))
354+
val stderr = driverLogs("stderr") // YARN puts everything in stderr.
355+
val lines = Source.fromURL(stderr).getLines()
356+
// Look for a line that contains YarnClusterSchedulerBackend, since that is guaranteed in
357+
// cluster mode.
358+
assert(lines.exists(_.contains("YarnClusterSchedulerBackend")))
359+
}
338360
}
339361

340362
}

0 commit comments

Comments
 (0)