Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ <h4 style="clear: left; display: inline-block;">Executors</h4>
Shuffle Write</span></th>
<th><span data-toggle="tooltip" data-placement="left" title="Logs">Logs</span></th>
<th><span data-toggle="tooltip" data-placement="left" title="Thread Dump">Thread Dump</span></th>
<th><span data-toggle="tooltip" data-placement="left" title="Worker">Worker</span></th>
</tr>
</thead>
<tbody>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,18 @@ function formatLogsCells(execLogs, type) {
return result;
}

function formatWorkersCells(worker, type) {
if ((!worker) || (worker['ui_url'] === undefined)) return "";
if (type !== 'display') return worker['ui_url'];
return '<td><a href=' + worker['ui_url'] + '>Worker</a></td>'
}

function workersExist(execs) {
return execs.some(function(exec) {
return !($.isEmptyObject(exec["worker"]));
});
}

function logsExist(execs) {
return execs.some(function(exec) {
return !($.isEmptyObject(exec["executorLogs"]));
Expand Down Expand Up @@ -408,7 +420,8 @@ $(document).ready(function () {
data: 'id', render: function (data, type) {
return type === 'display' ? ("<a href='threadDump/?executorId=" + data + "'>Thread Dump</a>" ) : data;
}
}
},
{data: 'worker', render: formatWorkersCells}
],
"columnDefs": [
{
Expand All @@ -421,6 +434,7 @@ $(document).ready(function () {

var dt = $(selector).DataTable(conf);
dt.column(15).visible(logsExist(response));
dt.column(17).visible(workersExist(response));
$('#active-executors [data-toggle="tooltip"]').tooltip();

var sumSelector = "#summary-execs-table";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,10 @@ private[deploy] class ExecutorRunner(
builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")

// Add webUI worker urls and worker webUI urls
builder.environment.put("SPARK_WORKER_URL", workerUrl)
builder.environment.put("SPARK_WORKER_UI_URL", s"http://$publicAddress:$webUiPort")

process = builder.start()
val header = "Spark Executor Command: %s\n%s\n\n".format(
formattedCommand, "=" * 40)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ private[spark] class CoarseGrainedExecutorBackend(
rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
// This is a very fast action so we can use "ThreadUtils.sameThread"
driver = Some(ref)
ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls))
ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls,
extractWorkerUrls))
}(ThreadUtils.sameThread).onComplete {
// This is a very fast action so we can use "ThreadUtils.sameThread"
case Success(msg) =>
Expand All @@ -75,6 +76,12 @@ private[spark] class CoarseGrainedExecutorBackend(
.map(e => (e._1.substring(prefix.length).toLowerCase, e._2))
}

def extractWorkerUrls: Map[String, String] = {
val prefix = "SPARK_WORKER_"
sys.env.filterKeys(_.startsWith(prefix))
.map(e => (e._1.substring(prefix.length).toLowerCase, e._2))
}

override def receive: PartialFunction[Any, Unit] = {
case RegisteredExecutor =>
logInfo("Successfully registered with driver")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ private[spark] object CoarseGrainedClusterMessages {
executorRef: RpcEndpointRef,
hostname: String,
cores: Int,
logUrls: Map[String, String])
logUrls: Map[String, String],
workerUrl: Map[String, String] = Map.empty)
extends CoarseGrainedClusterMessage

case class StatusUpdate(executorId: String, taskId: Long, state: TaskState,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {

case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls) =>
case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls, workerUrl) =>
if (executorDataMap.contains(executorId)) {
executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
context.reply(true)
Expand All @@ -166,7 +166,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
totalCoreCount.addAndGet(cores)
totalRegisteredExecutors.addAndGet(1)
val data = new ExecutorData(executorRef, executorRef.address, hostname,
cores, cores, logUrls)
cores, cores, logUrls, workerUrl)
// This must be synchronized because variables mutated
// in this block are read when requesting executors
CoarseGrainedSchedulerBackend.this.synchronized {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,18 @@ private[cluster] class ExecutorData(
override val executorHost: String,
var freeCores: Int,
override val totalCores: Int,
override val logUrlMap: Map[String, String]
) extends ExecutorInfo(executorHost, totalCores, logUrlMap)
override val logUrlMap: Map[String, String],
override val workerUrl: Map[String, String]
) extends ExecutorInfo(executorHost, totalCores, logUrlMap, workerUrl) {

def this(
executorEndpoint: RpcEndpointRef,
executorAddress: RpcAddress,
executorHost: String,
freeCores: Int,
totalCores: Int,
logUrlMap: Map[String, String]) {
this(executorEndpoint, executorAddress, executorHost, freeCores, totalCores, logUrlMap,
Map[String, String]() )
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ import org.apache.spark.annotation.DeveloperApi
class ExecutorInfo(
val executorHost: String,
val totalCores: Int,
val logUrlMap: Map[String, String]) {
val logUrlMap: Map[String, String],
val workerUrl: Map[String, String] = Map.empty) {

def canEqual(other: Any): Boolean = other.isInstanceOf[ExecutorInfo]

Expand All @@ -35,12 +36,13 @@ class ExecutorInfo(
(that canEqual this) &&
executorHost == that.executorHost &&
totalCores == that.totalCores &&
logUrlMap == that.logUrlMap
logUrlMap == that.logUrlMap &&
workerUrl == that.workerUrl
case _ => false
}

override def hashCode(): Int = {
val state = Seq(executorHost, totalCores, logUrlMap)
val state = Seq(executorHost, totalCores, logUrlMap, workerUrl)
state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
}
}
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/status/api/v1/api.scala
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ class ExecutorSummary private[spark](
val totalShuffleRead: Long,
val totalShuffleWrite: Long,
val maxMemory: Long,
val executorLogs: Map[String, String])
val executorLogs: Map[String, String],
val worker: Map[String, String])

class JobData private[spark](
val jobId: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ private[spark] object ExecutorsPage {
taskSummary.shuffleRead,
taskSummary.shuffleWrite,
maxMem,
taskSummary.executorLogs
taskSummary.executorLogs,
taskSummary.workerUrls
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ private[ui] case class ExecutorTaskSummary(
var shuffleRead: Long = 0L,
var shuffleWrite: Long = 0L,
var executorLogs: Map[String, String] = Map.empty,
var workerUrls: Map[String, String] = Map.empty,
var isAlive: Boolean = true
)

Expand All @@ -77,6 +78,7 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar
val eid = executorAdded.executorId
val taskSummary = executorToTaskSummary.getOrElseUpdate(eid, ExecutorTaskSummary(eid))
taskSummary.executorLogs = executorAdded.executorInfo.logUrlMap
taskSummary.workerUrls = executorAdded.executorInfo.workerUrl
taskSummary.totalCores = executorAdded.executorInfo.totalCores
taskSummary.tasksMax = taskSummary.totalCores / conf.getInt("spark.task.cpus", 1)
executorEvents += executorAdded
Expand Down
12 changes: 10 additions & 2 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,8 @@ private[spark] object JsonProtocol {
def executorInfoToJson(executorInfo: ExecutorInfo): JValue = {
("Host" -> executorInfo.executorHost) ~
("Total Cores" -> executorInfo.totalCores) ~
("Log Urls" -> mapToJson(executorInfo.logUrlMap))
("Log Urls" -> mapToJson(executorInfo.logUrlMap)) ~
("Worker" -> mapToJson(executorInfo.workerUrl))
}

/** ------------------------------ *
Expand Down Expand Up @@ -967,7 +968,14 @@ private[spark] object JsonProtocol {
val executorHost = (json \ "Host").extract[String]
val totalCores = (json \ "Total Cores").extract[Int]
val logUrls = mapFromJson(json \ "Log Urls").toMap
new ExecutorInfo(executorHost, totalCores, logUrls)
val workerOption = Utils.jsonOption(json \ "Worker")
workerOption match {
case Some(workerJson) =>
val worker = mapFromJson(workerJson).toMap
new ExecutorInfo(executorHost, totalCores, logUrls, worker)
case None =>
new ExecutorInfo(executorHost, totalCores, logUrls)
}
}

/** -------------------------------- *
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@
"totalShuffleRead" : 0,
"totalShuffleWrite" : 13180,
"maxMemory" : 278302556,
"executorLogs" : { }
"executorLogs" : { },
"worker" : { }
} ]
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,15 @@ class JsonProtocolSuite extends SparkFunSuite {
BlockManagerId("Scarce", "to be counted...", 100))
val unpersistRdd = SparkListenerUnpersistRDD(12345)
val logUrlMap = Map("stderr" -> "mystderr", "stdout" -> "mystdout").toMap
val workerUrlMap = Map("url" -> "spark://Worker@someHost:8080",
"ui_url" -> "http://someHost:1234").toMap
val applicationStart = SparkListenerApplicationStart("The winner of all", Some("appId"),
42L, "Garfield", Some("appAttempt"))
val applicationStartWithLogs = SparkListenerApplicationStart("The winner of all", Some("appId"),
42L, "Garfield", Some("appAttempt"), Some(logUrlMap))
val applicationEnd = SparkListenerApplicationEnd(42L)
val executorAdded = SparkListenerExecutorAdded(executorAddedTime, "exec1",
new ExecutorInfo("Hostee.awesome.com", 11, logUrlMap))
new ExecutorInfo("Hostee.awesome.com", 11, logUrlMap, workerUrlMap))
val executorRemoved = SparkListenerExecutorRemoved(executorRemovedTime, "exec2", "test reason")
val executorMetricsUpdate = {
// Use custom accum ID for determinism
Expand Down Expand Up @@ -364,6 +366,16 @@ class JsonProtocolSuite extends SparkFunSuite {
assertEquals(expectedDenied, JsonProtocol.taskEndReasonFromJson(oldDenied))
}

test("ExecutorInfo backward compatibility") {
// "workerUrl" property of ExecutorInfo was added in 2.1.0
val logUrlMap = Map("stderr" -> "mystderr", "stdout" -> "mystdout").toMap
val executorInfo = new ExecutorInfo("Hostee.awesome.com", 11, logUrlMap)
val oldExecutorInfoJson = JsonProtocol.executorInfoToJson(executorInfo)
.removeField({ _._1 == "Worker" })
val oldExecutorInfo = JsonProtocol.executorInfoFromJson(oldExecutorInfoJson)
assertEquals(executorInfo, oldExecutorInfo)
}

test("AccumulableInfo backward compatibility") {
// "Internal" property of AccumulableInfo was added in 1.5.1
val accumulableInfo = makeAccumulableInfo(1, internal = true, countFailedValues = true)
Expand Down Expand Up @@ -602,6 +614,8 @@ private[spark] object JsonProtocolSuite extends Assertions {
private def assertEquals(info1: ExecutorInfo, info2: ExecutorInfo) {
assert(info1.executorHost == info2.executorHost)
assert(info1.totalCores == info2.totalCores)
assertEquals(info1.logUrlMap, info2.logUrlMap)
assertEquals(info1.workerUrl, info2.workerUrl)
}

private def assertEquals(metrics1: TaskMetrics, metrics2: TaskMetrics) {
Expand Down Expand Up @@ -688,6 +702,17 @@ private[spark] object JsonProtocolSuite extends Assertions {
}
}

case class MapStringToString(m: Map[String, String])
implicit private def mapStringtoString(m: Map[String, String]): MapStringToString =
MapStringToString(m)
private def assertEquals(details1: MapStringToString, details2: MapStringToString) {
details1.m.zip(details2.m).foreach {
case ((key1, values1), (key2, values2)) =>
assert(key1 === key2)
assert(values1 === values2)
}
}

private def assertEquals(exception1: Exception, exception2: Exception) {
assert(exception1.getMessage === exception2.getMessage)
assertSeqEquals(
Expand Down Expand Up @@ -1759,6 +1784,10 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Log Urls" : {
| "stderr" : "mystderr",
| "stdout" : "mystdout"
| },
| "Worker" : {
| "url" : "spark://Worker@someHost:8080",
| "ui_url" : "http://someHost:1234"
| }
| }
|}
Expand Down
2 changes: 2 additions & 0 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ object MimaExcludes {
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.Dataset.select"),
// [SPARK-16967] Move Mesos to Module
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.SparkMasterRegex.MESOS_REGEX"),
// [SPARK-16520] [WEBUI] Link executors to corresponding worker pages
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.cluster.ExecutorInfo.this"),
// [SPARK-16240] ML persistence backward compatibility for LDA
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.ml.clustering.LDA$"),
// [SPARK-17717] Add Find and Exists method to Catalog.
Expand Down