diff --git a/core/src/main/resources/org/apache/spark/ui/static/executorspage-template.html b/core/src/main/resources/org/apache/spark/ui/static/executorspage-template.html index 64ea719141f4b..a1e2c3e355e6e 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/executorspage-template.html +++ b/core/src/main/resources/org/apache/spark/ui/static/executorspage-template.html @@ -95,6 +95,7 @@

Executors

Shuffle Write Logs Thread Dump + Worker diff --git a/core/src/main/resources/org/apache/spark/ui/static/executorspage.js b/core/src/main/resources/org/apache/spark/ui/static/executorspage.js index fe5db6aa26b65..37d612b7d447e 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/executorspage.js +++ b/core/src/main/resources/org/apache/spark/ui/static/executorspage.js @@ -126,6 +126,18 @@ function formatLogsCells(execLogs, type) { return result; } +function formatWorkersCells(worker, type) { + if ((!worker) || (worker['ui_url'] === undefined)) return ""; + if (type !== 'display') return worker['ui_url']; + return 'Worker' +} + +function workersExist(execs) { + return execs.some(function(exec) { + return !($.isEmptyObject(exec["worker"])); + }); +} + function logsExist(execs) { return execs.some(function(exec) { return !($.isEmptyObject(exec["executorLogs"])); @@ -408,7 +420,8 @@ $(document).ready(function () { data: 'id', render: function (data, type) { return type === 'display' ? ("Thread Dump" ) : data; } - } + }, + {data: 'worker', render: formatWorkersCells} ], "columnDefs": [ { @@ -421,6 +434,7 @@ $(document).ready(function () { var dt = $(selector).DataTable(conf); dt.column(15).visible(logsExist(response)); + dt.column(17).visible(workersExist(response)); $('#active-executors [data-toggle="tooltip"]').tooltip(); var sumSelector = "#summary-execs-table"; diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index d4d8521cc8204..aabd561b92cd4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -164,6 +164,10 @@ private[deploy] class ExecutorRunner( builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr") builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout") + // Add webUI worker urls and worker webUI urls + builder.environment.put("SPARK_WORKER_URL", workerUrl) + builder.environment.put("SPARK_WORKER_UI_URL", s"http://$publicAddress:$webUiPort") + process = builder.start() val header = "Spark Executor Command: %s\n%s\n\n".format( formattedCommand, "=" * 40) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 92a27902c6696..a90ef1cd1d141 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -59,7 +59,8 @@ private[spark] class CoarseGrainedExecutorBackend( rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref => // This is a very fast action so we can use "ThreadUtils.sameThread" driver = Some(ref) - ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls)) + ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls, + extractWorkerUrls)) }(ThreadUtils.sameThread).onComplete { // This is a very fast action so we can use "ThreadUtils.sameThread" case Success(msg) => @@ -75,6 +76,12 @@ private[spark] class CoarseGrainedExecutorBackend( .map(e => (e._1.substring(prefix.length).toLowerCase, e._2)) } + def extractWorkerUrls: Map[String, String] = { + val prefix = "SPARK_WORKER_" + sys.env.filterKeys(_.startsWith(prefix)) + .map(e => (e._1.substring(prefix.length).toLowerCase, e._2)) + } + override def receive: PartialFunction[Any, Unit] = { case RegisteredExecutor => logInfo("Successfully registered with driver") diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 02803598097d9..6b2f9335a2d69 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -56,7 +56,8 @@ private[spark] object CoarseGrainedClusterMessages { executorRef: RpcEndpointRef, hostname: String, cores: Int, - logUrls: Map[String, String]) + logUrls: Map[String, String], + workerUrl: Map[String, String] = Map.empty) extends CoarseGrainedClusterMessage case class StatusUpdate(executorId: String, taskId: Long, state: TaskState, diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 3452487e72e88..a7944020e1f2a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -149,7 +149,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { - case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls) => + case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls, workerUrl) => if (executorDataMap.contains(executorId)) { executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId)) context.reply(true) @@ -166,7 +166,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp totalCoreCount.addAndGet(cores) totalRegisteredExecutors.addAndGet(1) val data = new ExecutorData(executorRef, executorRef.address, hostname, - cores, cores, logUrls) + cores, cores, logUrls, workerUrl) // This must be synchronized because variables mutated // in this block are read when requesting executors CoarseGrainedSchedulerBackend.this.synchronized { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala index b25a4bfb501fb..18b7f05e01f42 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala @@ -34,5 +34,18 @@ private[cluster] class ExecutorData( override val executorHost: String, var freeCores: Int, override val totalCores: Int, - override val logUrlMap: Map[String, String] -) extends ExecutorInfo(executorHost, totalCores, logUrlMap) + override val logUrlMap: Map[String, String], + override val workerUrl: Map[String, String] +) extends ExecutorInfo(executorHost, totalCores, logUrlMap, workerUrl) { + + def this( + executorEndpoint: RpcEndpointRef, + executorAddress: RpcAddress, + executorHost: String, + freeCores: Int, + totalCores: Int, + logUrlMap: Map[String, String]) { + this(executorEndpoint, executorAddress, executorHost, freeCores, totalCores, logUrlMap, + Map[String, String]() ) + } +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala index 7f218566146a1..387d1fe70d2eb 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala @@ -26,7 +26,8 @@ import org.apache.spark.annotation.DeveloperApi class ExecutorInfo( val executorHost: String, val totalCores: Int, - val logUrlMap: Map[String, String]) { + val logUrlMap: Map[String, String], + val workerUrl: Map[String, String] = Map.empty) { def canEqual(other: Any): Boolean = other.isInstanceOf[ExecutorInfo] @@ -35,12 +36,13 @@ class ExecutorInfo( (that canEqual this) && executorHost == that.executorHost && totalCores == that.totalCores && - logUrlMap == that.logUrlMap + logUrlMap == that.logUrlMap && + workerUrl == that.workerUrl case _ => false } override def hashCode(): Int = { - val state = Seq(executorHost, totalCores, logUrlMap) + val state = Seq(executorHost, totalCores, logUrlMap, workerUrl) state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b) } } diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala index 44a929b310384..bd58ca3ed8fa4 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala @@ -74,7 +74,8 @@ class ExecutorSummary private[spark]( val totalShuffleRead: Long, val totalShuffleWrite: Long, val maxMemory: Long, - val executorLogs: Map[String, String]) + val executorLogs: Map[String, String], + val worker: Map[String, String]) class JobData private[spark]( val jobId: Int, diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala index 7953d77fd7ece..63aa8f06e6e3a 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala @@ -102,7 +102,8 @@ private[spark] object ExecutorsPage { taskSummary.shuffleRead, taskSummary.shuffleWrite, maxMem, - taskSummary.executorLogs + taskSummary.executorLogs, + taskSummary.workerUrls ) } } diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala index 678571fd4f5ac..05b6a90338226 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala @@ -53,6 +53,7 @@ private[ui] case class ExecutorTaskSummary( var shuffleRead: Long = 0L, var shuffleWrite: Long = 0L, var executorLogs: Map[String, String] = Map.empty, + var workerUrls: Map[String, String] = Map.empty, var isAlive: Boolean = true ) @@ -77,6 +78,7 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar val eid = executorAdded.executorId val taskSummary = executorToTaskSummary.getOrElseUpdate(eid, ExecutorTaskSummary(eid)) taskSummary.executorLogs = executorAdded.executorInfo.logUrlMap + taskSummary.workerUrls = executorAdded.executorInfo.workerUrl taskSummary.totalCores = executorAdded.executorInfo.totalCores taskSummary.tasksMax = taskSummary.totalCores / conf.getInt("spark.task.cpus", 1) executorEvents += executorAdded diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 4b4d2d10cbf8d..b919c87eb082a 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -443,7 +443,8 @@ private[spark] object JsonProtocol { def executorInfoToJson(executorInfo: ExecutorInfo): JValue = { ("Host" -> executorInfo.executorHost) ~ ("Total Cores" -> executorInfo.totalCores) ~ - ("Log Urls" -> mapToJson(executorInfo.logUrlMap)) + ("Log Urls" -> mapToJson(executorInfo.logUrlMap)) ~ + ("Worker" -> mapToJson(executorInfo.workerUrl)) } /** ------------------------------ * @@ -967,7 +968,14 @@ private[spark] object JsonProtocol { val executorHost = (json \ "Host").extract[String] val totalCores = (json \ "Total Cores").extract[Int] val logUrls = mapFromJson(json \ "Log Urls").toMap - new ExecutorInfo(executorHost, totalCores, logUrls) + val workerOption = Utils.jsonOption(json \ "Worker") + workerOption match { + case Some(workerJson) => + val worker = mapFromJson(workerJson).toMap + new ExecutorInfo(executorHost, totalCores, logUrls, worker) + case None => + new ExecutorInfo(executorHost, totalCores, logUrls) + } } /** -------------------------------- * diff --git a/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json index e7db6742c25e1..040553c6382e6 100644 --- a/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json +++ b/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json @@ -17,5 +17,6 @@ "totalShuffleRead" : 0, "totalShuffleWrite" : 13180, "maxMemory" : 278302556, - "executorLogs" : { } + "executorLogs" : { }, + "worker" : { } } ] diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 85da79180fd0b..2f358e0901763 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -74,13 +74,15 @@ class JsonProtocolSuite extends SparkFunSuite { BlockManagerId("Scarce", "to be counted...", 100)) val unpersistRdd = SparkListenerUnpersistRDD(12345) val logUrlMap = Map("stderr" -> "mystderr", "stdout" -> "mystdout").toMap + val workerUrlMap = Map("url" -> "spark://Worker@someHost:8080", + "ui_url" -> "http://someHost:1234").toMap val applicationStart = SparkListenerApplicationStart("The winner of all", Some("appId"), 42L, "Garfield", Some("appAttempt")) val applicationStartWithLogs = SparkListenerApplicationStart("The winner of all", Some("appId"), 42L, "Garfield", Some("appAttempt"), Some(logUrlMap)) val applicationEnd = SparkListenerApplicationEnd(42L) val executorAdded = SparkListenerExecutorAdded(executorAddedTime, "exec1", - new ExecutorInfo("Hostee.awesome.com", 11, logUrlMap)) + new ExecutorInfo("Hostee.awesome.com", 11, logUrlMap, workerUrlMap)) val executorRemoved = SparkListenerExecutorRemoved(executorRemovedTime, "exec2", "test reason") val executorMetricsUpdate = { // Use custom accum ID for determinism @@ -364,6 +366,16 @@ class JsonProtocolSuite extends SparkFunSuite { assertEquals(expectedDenied, JsonProtocol.taskEndReasonFromJson(oldDenied)) } + test("ExecutorInfo backward compatibility") { + // "workerUrl" property of ExecutorInfo was added in 2.1.0 + val logUrlMap = Map("stderr" -> "mystderr", "stdout" -> "mystdout").toMap + val executorInfo = new ExecutorInfo("Hostee.awesome.com", 11, logUrlMap) + val oldExecutorInfoJson = JsonProtocol.executorInfoToJson(executorInfo) + .removeField({ _._1 == "Worker" }) + val oldExecutorInfo = JsonProtocol.executorInfoFromJson(oldExecutorInfoJson) + assertEquals(executorInfo, oldExecutorInfo) + } + test("AccumulableInfo backward compatibility") { // "Internal" property of AccumulableInfo was added in 1.5.1 val accumulableInfo = makeAccumulableInfo(1, internal = true, countFailedValues = true) @@ -602,6 +614,8 @@ private[spark] object JsonProtocolSuite extends Assertions { private def assertEquals(info1: ExecutorInfo, info2: ExecutorInfo) { assert(info1.executorHost == info2.executorHost) assert(info1.totalCores == info2.totalCores) + assertEquals(info1.logUrlMap, info2.logUrlMap) + assertEquals(info1.workerUrl, info2.workerUrl) } private def assertEquals(metrics1: TaskMetrics, metrics2: TaskMetrics) { @@ -688,6 +702,17 @@ private[spark] object JsonProtocolSuite extends Assertions { } } + case class MapStringToString(m: Map[String, String]) + implicit private def mapStringtoString(m: Map[String, String]): MapStringToString = + MapStringToString(m) + private def assertEquals(details1: MapStringToString, details2: MapStringToString) { + details1.m.zip(details2.m).foreach { + case ((key1, values1), (key2, values2)) => + assert(key1 === key2) + assert(values1 === values2) + } + } + private def assertEquals(exception1: Exception, exception2: Exception) { assert(exception1.getMessage === exception2.getMessage) assertSeqEquals( @@ -1759,6 +1784,10 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Log Urls" : { | "stderr" : "mystderr", | "stdout" : "mystdout" + | }, + | "Worker" : { + | "url" : "spark://Worker@someHost:8080", + | "ui_url" : "http://someHost:1234" | } | } |} diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 2314d7f45cb21..88072e3502d20 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -59,6 +59,8 @@ object MimaExcludes { ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.Dataset.select"), // [SPARK-16967] Move Mesos to Module ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.SparkMasterRegex.MESOS_REGEX"), + // [SPARK-16520] [WEBUI] Link executors to corresponding worker pages + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.cluster.ExecutorInfo.this"), // [SPARK-16240] ML persistence backward compatibility for LDA ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.ml.clustering.LDA$"), // [SPARK-17717] Add Find and Exists method to Catalog.