Skip to content

Commit 21a5abc

Browse files
committed
add links to worker UIs in the executor page
1 parent e50efd5 commit 21a5abc

File tree

10 files changed

+54
-15
lines changed

10 files changed

+54
-15
lines changed

core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,10 @@ private[deploy] class ExecutorRunner(
160160
builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
161161
builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")
162162

163+
// Add webUI worker urls and worker webUI urls
164+
builder.environment.put("SPARK_WORKER_URL", workerUrl)
165+
builder.environment.put("SPARK_WORKER_UI_URL", s"http://$publicAddress:$webUiPort")
166+
163167
process = builder.start()
164168
val header = "Spark Executor Command: %s\n%s\n\n".format(
165169
formattedCommand, "=" * 40)

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@ private[spark] class CoarseGrainedExecutorBackend(
5858
rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
5959
// This is a very fast action so we can use "ThreadUtils.sameThread"
6060
driver = Some(ref)
61-
ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls))
61+
ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls,
62+
extractWorkerUrls))
6263
}(ThreadUtils.sameThread).onComplete {
6364
// This is a very fast action so we can use "ThreadUtils.sameThread"
6465
case Success(msg) =>
@@ -75,6 +76,12 @@ private[spark] class CoarseGrainedExecutorBackend(
7576
.map(e => (e._1.substring(prefix.length).toLowerCase, e._2))
7677
}
7778

79+
def extractWorkerUrls: Map[String, String] = {
80+
val prefix = "SPARK_WORKER_"
81+
sys.env.filterKeys(_.startsWith(prefix))
82+
.map(e => (e._1.substring(prefix.length).toLowerCase, e._2))
83+
}
84+
7885
override def receive: PartialFunction[Any, Unit] = {
7986
case RegisteredExecutor =>
8087
logInfo("Successfully registered with driver")

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@ private[spark] object CoarseGrainedClusterMessages {
5151
executorRef: RpcEndpointRef,
5252
hostname: String,
5353
cores: Int,
54-
logUrls: Map[String, String])
54+
logUrls: Map[String, String],
55+
workerUrl: Map[String, String] = Map.empty)
5556
extends CoarseGrainedClusterMessage
5657

5758
case class StatusUpdate(executorId: String, taskId: Long, state: TaskState,

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
146146

147147
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
148148

149-
case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls) =>
149+
case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls, workerUrl) =>
150150
if (executorDataMap.contains(executorId)) {
151151
executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
152152
context.reply(true)
@@ -163,7 +163,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
163163
totalCoreCount.addAndGet(cores)
164164
totalRegisteredExecutors.addAndGet(1)
165165
val data = new ExecutorData(executorRef, executorRef.address, hostname,
166-
cores, cores, logUrls)
166+
cores, cores, logUrls, workerUrl)
167167
// This must be synchronized because variables mutated
168168
// in this block are read when requesting executors
169169
CoarseGrainedSchedulerBackend.this.synchronized {

core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,5 +34,6 @@ private[cluster] class ExecutorData(
3434
override val executorHost: String,
3535
var freeCores: Int,
3636
override val totalCores: Int,
37-
override val logUrlMap: Map[String, String]
38-
) extends ExecutorInfo(executorHost, totalCores, logUrlMap)
37+
override val logUrlMap: Map[String, String],
38+
override val workerUrl: Map[String, String]
39+
) extends ExecutorInfo(executorHost, totalCores, logUrlMap, workerUrl)

core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ import org.apache.spark.annotation.DeveloperApi
2626
class ExecutorInfo(
2727
val executorHost: String,
2828
val totalCores: Int,
29-
val logUrlMap: Map[String, String]) {
29+
val logUrlMap: Map[String, String],
30+
val workerUrl: Map[String, String] = Map.empty) {
3031

3132
def canEqual(other: Any): Boolean = other.isInstanceOf[ExecutorInfo]
3233

@@ -35,12 +36,13 @@ class ExecutorInfo(
3536
(that canEqual this) &&
3637
executorHost == that.executorHost &&
3738
totalCores == that.totalCores &&
38-
logUrlMap == that.logUrlMap
39+
logUrlMap == that.logUrlMap &&
40+
workerUrl == that.workerUrl
3941
case _ => false
4042
}
4143

4244
override def hashCode(): Int = {
43-
val state = Seq(executorHost, totalCores, logUrlMap)
45+
val state = Seq(executorHost, totalCores, logUrlMap, workerUrl)
4446
state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
4547
}
4648
}

core/src/main/scala/org/apache/spark/status/api/v1/api.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,8 @@ class ExecutorSummary private[spark](
7474
val totalShuffleRead: Long,
7575
val totalShuffleWrite: Long,
7676
val maxMemory: Long,
77-
val executorLogs: Map[String, String])
77+
val executorLogs: Map[String, String],
78+
val worker: Map[String, String])
7879

7980
class JobData private[spark](
8081
val jobId: Int,

core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ private[ui] class ExecutorsPage(
7676
implicit val idOrder = Ordering[Int].on((s: String) => idStrToInt(s)).reverse
7777
val execInfoSorted = execInfo.sortBy(_.id)
7878
val logsExist = execInfo.filter(_.executorLogs.nonEmpty).nonEmpty
79+
val workerExist = execInfo.exists(_.worker.nonEmpty)
7980

8081
val execTable = {
8182
<table class={UIUtils.TABLE_CLASS_STRIPED_SORTABLE}>
@@ -104,9 +105,10 @@ private[ui] class ExecutorsPage(
104105
</th>
105106
{if (logsExist) <th class="sorttable_nosort">Logs</th> else Seq.empty}
106107
{if (threadDumpEnabled) <th class="sorttable_nosort">Thread Dump</th> else Seq.empty}
108+
{if (workerExist) <th class="sorttable_nosort">Worker</th> else Seq.empty}
107109
</thead>
108110
<tbody>
109-
{execInfoSorted.map(execRow(_, logsExist))}
111+
{execInfoSorted.map(execRow(_, logsExist, workerExist))}
110112
</tbody>
111113
</table>
112114
}
@@ -129,7 +131,8 @@ private[ui] class ExecutorsPage(
129131
}
130132

131133
/** Render an HTML row representing an executor */
132-
private def execRow(info: ExecutorSummary, logsExist: Boolean): Seq[Node] = {
134+
private def execRow(info: ExecutorSummary, logsExist: Boolean, workerExist: Boolean)
135+
: Seq[Node] = {
133136
val maximumMemory = info.maxMemory
134137
val memoryUsed = info.memoryUsed
135138
val diskUsed = info.diskUsed
@@ -195,6 +198,20 @@ private[ui] class ExecutorsPage(
195198
Seq.empty
196199
}
197200
}
201+
{
202+
if (workerExist) {
203+
val uiUrl = info.worker.get("ui_url")
204+
if (uiUrl.isDefined) {
205+
<td>
206+
<a href={uiUrl.get} >
207+
Worker
208+
</a>
209+
</td>
210+
} else {
211+
<td> </td>
212+
}
213+
}
214+
}
198215
</tr>
199216
}
200217

@@ -365,6 +382,7 @@ private[spark] object ExecutorsPage {
365382
val totalShuffleRead = listener.executorToShuffleRead.getOrElse(execId, 0L)
366383
val totalShuffleWrite = listener.executorToShuffleWrite.getOrElse(execId, 0L)
367384
val executorLogs = listener.executorToLogUrls.getOrElse(execId, Map.empty)
385+
val worker = listener.executorToWorkerUrls.getOrElse(execId, Map.empty)
368386

369387
new ExecutorSummary(
370388
execId,
@@ -385,7 +403,8 @@ private[spark] object ExecutorsPage {
385403
totalShuffleRead,
386404
totalShuffleWrite,
387405
maxMem,
388-
executorLogs
406+
executorLogs,
407+
worker
389408
)
390409
}
391410
}

core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar
5959
val executorToShuffleRead = HashMap[String, Long]()
6060
val executorToShuffleWrite = HashMap[String, Long]()
6161
val executorToLogUrls = HashMap[String, Map[String, String]]()
62+
val executorToWorkerUrls = HashMap[String, Map[String, String]]()
6263
val executorIdToData = HashMap[String, ExecutorUIData]()
6364

6465
def activeStorageStatusList: Seq[StorageStatus] = storageStatusListener.storageStatusList
@@ -68,6 +69,7 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar
6869
override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = synchronized {
6970
val eid = executorAdded.executorId
7071
executorToLogUrls(eid) = executorAdded.executorInfo.logUrlMap
72+
executorToWorkerUrls(eid) = executorAdded.executorInfo.workerUrl
7173
executorToTotalCores(eid) = executorAdded.executorInfo.totalCores
7274
executorToTasksMax(eid) = executorToTotalCores(eid) / conf.getInt("spark.task.cpus", 1)
7375
executorIdToData(eid) = new ExecutorUIData(executorAdded.time)

core/src/main/scala/org/apache/spark/util/JsonProtocol.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -440,7 +440,8 @@ private[spark] object JsonProtocol {
440440
def executorInfoToJson(executorInfo: ExecutorInfo): JValue = {
441441
("Host" -> executorInfo.executorHost) ~
442442
("Total Cores" -> executorInfo.totalCores) ~
443-
("Log Urls" -> mapToJson(executorInfo.logUrlMap))
443+
("Log Urls" -> mapToJson(executorInfo.logUrlMap)) ~
444+
("Worker" -> mapToJson(executorInfo.workerUrl))
444445
}
445446

446447
/** ------------------------------ *
@@ -934,7 +935,8 @@ private[spark] object JsonProtocol {
934935
val executorHost = (json \ "Host").extract[String]
935936
val totalCores = (json \ "Total Cores").extract[Int]
936937
val logUrls = mapFromJson(json \ "Log Urls").toMap
937-
new ExecutorInfo(executorHost, totalCores, logUrls)
938+
val worker = mapFromJson(json \ "Worker").toMap
939+
new ExecutorInfo(executorHost, totalCores, logUrls, worker)
938940
}
939941

940942
/** -------------------------------- *

0 commit comments

Comments
 (0)