From 21a5abc873c69f306514eaf0130afe96bb8db9d9 Mon Sep 17 00:00:00 2001 From: Tao Lin Date: Thu, 14 Jul 2016 23:13:59 +0800 Subject: [PATCH 01/12] add links to worker UIs in the executor page --- .../spark/deploy/worker/ExecutorRunner.scala | 4 +++ .../CoarseGrainedExecutorBackend.scala | 9 ++++++- .../cluster/CoarseGrainedClusterMessage.scala | 3 ++- .../CoarseGrainedSchedulerBackend.scala | 4 +-- .../scheduler/cluster/ExecutorData.scala | 5 ++-- .../scheduler/cluster/ExecutorInfo.scala | 8 +++--- .../org/apache/spark/status/api/v1/api.scala | 3 ++- .../apache/spark/ui/exec/ExecutorsPage.scala | 25 ++++++++++++++++--- .../apache/spark/ui/exec/ExecutorsTab.scala | 2 ++ .../org/apache/spark/util/JsonProtocol.scala | 6 +++-- 10 files changed, 54 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index 06066248ea5d..10d6c412a58b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -160,6 +160,10 @@ private[deploy] class ExecutorRunner( builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr") builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout") + // Add webUI worker urls and worker webUI urls + builder.environment.put("SPARK_WORKER_URL", workerUrl) + builder.environment.put("SPARK_WORKER_UI_URL", s"http://$publicAddress:$webUiPort") + process = builder.start() val header = "Spark Executor Command: %s\n%s\n\n".format( formattedCommand, "=" * 40) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index ccc6c36e9c79..74a436b9db80 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -58,7 +58,8 @@ private[spark] class CoarseGrainedExecutorBackend( rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref => // This is a very fast action so we can use "ThreadUtils.sameThread" driver = Some(ref) - ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls)) + ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls, + extractWorkerUrls)) }(ThreadUtils.sameThread).onComplete { // This is a very fast action so we can use "ThreadUtils.sameThread" case Success(msg) => @@ -75,6 +76,12 @@ private[spark] class CoarseGrainedExecutorBackend( .map(e => (e._1.substring(prefix.length).toLowerCase, e._2)) } + def extractWorkerUrls: Map[String, String] = { + val prefix = "SPARK_WORKER_" + sys.env.filterKeys(_.startsWith(prefix)) + .map(e => (e._1.substring(prefix.length).toLowerCase, e._2)) + } + override def receive: PartialFunction[Any, Unit] = { case RegisteredExecutor => logInfo("Successfully registered with driver") diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index edc8aac5d151..9cc49530141b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -51,7 +51,8 @@ private[spark] object CoarseGrainedClusterMessages { executorRef: RpcEndpointRef, hostname: String, cores: Int, - logUrls: Map[String, String]) + logUrls: Map[String, String], + workerUrl: Map[String, String] = Map.empty) extends CoarseGrainedClusterMessage case class StatusUpdate(executorId: String, taskId: Long, state: TaskState, diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 8259923ce31c..d5aa1cf5b81e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -146,7 +146,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { - case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls) => + case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls, workerUrl) => if (executorDataMap.contains(executorId)) { executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId)) context.reply(true) @@ -163,7 +163,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp totalCoreCount.addAndGet(cores) totalRegisteredExecutors.addAndGet(1) val data = new ExecutorData(executorRef, executorRef.address, hostname, - cores, cores, logUrls) + cores, cores, logUrls, workerUrl) // This must be synchronized because variables mutated // in this block are read when requesting executors CoarseGrainedSchedulerBackend.this.synchronized { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala index b25a4bfb501f..d5d9a960e341 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala @@ -34,5 +34,6 @@ private[cluster] class ExecutorData( override val executorHost: String, var freeCores: Int, override val totalCores: Int, - override val logUrlMap: Map[String, String] -) extends ExecutorInfo(executorHost, totalCores, logUrlMap) + override val logUrlMap: Map[String, String], + override val workerUrl: Map[String, String] +) extends ExecutorInfo(executorHost, totalCores, logUrlMap, workerUrl) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala index 7f218566146a..387d1fe70d2e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala @@ -26,7 +26,8 @@ import org.apache.spark.annotation.DeveloperApi class ExecutorInfo( val executorHost: String, val totalCores: Int, - val logUrlMap: Map[String, String]) { + val logUrlMap: Map[String, String], + val workerUrl: Map[String, String] = Map.empty) { def canEqual(other: Any): Boolean = other.isInstanceOf[ExecutorInfo] @@ -35,12 +36,13 @@ class ExecutorInfo( (that canEqual this) && executorHost == that.executorHost && totalCores == that.totalCores && - logUrlMap == that.logUrlMap + logUrlMap == that.logUrlMap && + workerUrl == that.workerUrl case _ => false } override def hashCode(): Int = { - val state = Seq(executorHost, totalCores, logUrlMap) + val state = Seq(executorHost, totalCores, logUrlMap, workerUrl) state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b) } } diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala index 32e332a9adb9..f94bb3864cff 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala @@ -74,7 +74,8 @@ class ExecutorSummary private[spark]( val totalShuffleRead: Long, val totalShuffleWrite: Long, val maxMemory: Long, - val executorLogs: Map[String, String]) + val executorLogs: Map[String, String], + val worker: Map[String, String]) class JobData private[spark]( val jobId: Int, diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala index 67deb7b14bcb..258162e689e8 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala @@ -76,6 +76,7 @@ private[ui] class ExecutorsPage( implicit val idOrder = Ordering[Int].on((s: String) => idStrToInt(s)).reverse val execInfoSorted = execInfo.sortBy(_.id) val logsExist = execInfo.filter(_.executorLogs.nonEmpty).nonEmpty + val workerExist = execInfo.exists(_.worker.nonEmpty) val execTable = { @@ -104,9 +105,10 @@ private[ui] class ExecutorsPage( {if (logsExist) else Seq.empty} {if (threadDumpEnabled) else Seq.empty} + {if (workerExist) else Seq.empty} - {execInfoSorted.map(execRow(_, logsExist))} + {execInfoSorted.map(execRow(_, logsExist, workerExist))}
LogsThread DumpWorker
} @@ -129,7 +131,8 @@ private[ui] class ExecutorsPage( } /** Render an HTML row representing an executor */ - private def execRow(info: ExecutorSummary, logsExist: Boolean): Seq[Node] = { + private def execRow(info: ExecutorSummary, logsExist: Boolean, workerExist: Boolean) + : Seq[Node] = { val maximumMemory = info.maxMemory val memoryUsed = info.memoryUsed val diskUsed = info.diskUsed @@ -195,6 +198,20 @@ private[ui] class ExecutorsPage( Seq.empty } } + { + if (workerExist) { + val uiUrl = info.worker.get("ui_url") + if (uiUrl.isDefined) { + + + Worker + + + } else { + + } + } + } } @@ -365,6 +382,7 @@ private[spark] object ExecutorsPage { val totalShuffleRead = listener.executorToShuffleRead.getOrElse(execId, 0L) val totalShuffleWrite = listener.executorToShuffleWrite.getOrElse(execId, 0L) val executorLogs = listener.executorToLogUrls.getOrElse(execId, Map.empty) + val worker = listener.executorToWorkerUrls.getOrElse(execId, Map.empty) new ExecutorSummary( execId, @@ -385,7 +403,8 @@ private[spark] object ExecutorsPage { totalShuffleRead, totalShuffleWrite, maxMem, - executorLogs + executorLogs, + worker ) } } diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala index 676f4457510c..c3b9cb0cd887 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala @@ -59,6 +59,7 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar val executorToShuffleRead = HashMap[String, Long]() val executorToShuffleWrite = HashMap[String, Long]() val executorToLogUrls = HashMap[String, Map[String, String]]() + val executorToWorkerUrls = HashMap[String, Map[String, String]]() val executorIdToData = HashMap[String, ExecutorUIData]() def activeStorageStatusList: Seq[StorageStatus] = storageStatusListener.storageStatusList @@ -68,6 +69,7 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = synchronized { val eid = executorAdded.executorId executorToLogUrls(eid) = executorAdded.executorInfo.logUrlMap + executorToWorkerUrls(eid) = executorAdded.executorInfo.workerUrl executorToTotalCores(eid) = executorAdded.executorInfo.totalCores executorToTasksMax(eid) = executorToTotalCores(eid) / conf.getInt("spark.task.cpus", 1) executorIdToData(eid) = new ExecutorUIData(executorAdded.time) diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 022b22689410..ab8b732432ce 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -440,7 +440,8 @@ private[spark] object JsonProtocol { def executorInfoToJson(executorInfo: ExecutorInfo): JValue = { ("Host" -> executorInfo.executorHost) ~ ("Total Cores" -> executorInfo.totalCores) ~ - ("Log Urls" -> mapToJson(executorInfo.logUrlMap)) + ("Log Urls" -> mapToJson(executorInfo.logUrlMap)) ~ + ("Worker" -> mapToJson(executorInfo.workerUrl)) } /** ------------------------------ * @@ -934,7 +935,8 @@ private[spark] object JsonProtocol { val executorHost = (json \ "Host").extract[String] val totalCores = (json \ "Total Cores").extract[Int] val logUrls = mapFromJson(json \ "Log Urls").toMap - new ExecutorInfo(executorHost, totalCores, logUrls) + val worker = mapFromJson(json \ "Worker").toMap + new ExecutorInfo(executorHost, totalCores, logUrls, worker) } /** -------------------------------- * From 4d1d47fd9c8c4909d182e963c33c064c5bafb3e2 Mon Sep 17 00:00:00 2001 From: Tao Lin Date: Wed, 20 Jul 2016 10:56:14 +0800 Subject: [PATCH 02/12] mima excludes --- project/MimaExcludes.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 56061559feff..30683d7ec7e5 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -39,6 +39,10 @@ object MimaExcludes { Seq( // [SPARK-16199][SQL] Add a method to list the referenced columns in data source Filter ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.sources.Filter.references") + ) ++ + Seq( + // [SPARK-16520] [WEBUI] Link executors to corresponding worker pages + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.cluster.ExecutorInfo.this") ) } From 1618ffe4b49f6ed3a16933f42cfb7e2ebac6ec40 Mon Sep 17 00:00:00 2001 From: Tao Lin Date: Fri, 22 Jul 2016 07:09:19 +0800 Subject: [PATCH 03/12] Fix for the unit test --- .../executor_list_json_expectation.json | 3 ++- .../test/scala/org/apache/spark/util/JsonProtocolSuite.scala | 4 +++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json index e7db6742c25e..040553c6382e 100644 --- a/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json +++ b/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json @@ -17,5 +17,6 @@ "totalShuffleRead" : 0, "totalShuffleWrite" : 13180, "maxMemory" : 278302556, - "executorLogs" : { } + "executorLogs" : { }, + "worker" : { } } ] diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 85ca9d39d4a3..3570a6866c84 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -73,13 +73,15 @@ class JsonProtocolSuite extends SparkFunSuite { BlockManagerId("Scarce", "to be counted...", 100)) val unpersistRdd = SparkListenerUnpersistRDD(12345) val logUrlMap = Map("stderr" -> "mystderr", "stdout" -> "mystdout").toMap + val workerUrlMap = Map("url" -> "spark://Worker@192.168.1.104:32790", + "ui_url" -> "http://192.168.1.104:46445").toMap val applicationStart = SparkListenerApplicationStart("The winner of all", Some("appId"), 42L, "Garfield", Some("appAttempt")) val applicationStartWithLogs = SparkListenerApplicationStart("The winner of all", Some("appId"), 42L, "Garfield", Some("appAttempt"), Some(logUrlMap)) val applicationEnd = SparkListenerApplicationEnd(42L) val executorAdded = SparkListenerExecutorAdded(executorAddedTime, "exec1", - new ExecutorInfo("Hostee.awesome.com", 11, logUrlMap)) + new ExecutorInfo("Hostee.awesome.com", 11, logUrlMap, workerUrlMap)) val executorRemoved = SparkListenerExecutorRemoved(executorRemovedTime, "exec2", "test reason") val executorMetricsUpdate = { // Use custom accum ID for determinism From 519c32974d3239d5780fb94064126b4a0737c656 Mon Sep 17 00:00:00 2001 From: Tao Lin Date: Fri, 22 Jul 2016 09:31:38 +0800 Subject: [PATCH 04/12] Fix JsonProtocolSuite --- .../test/scala/org/apache/spark/util/JsonProtocolSuite.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 3570a6866c84..3195a298f612 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -1751,6 +1751,10 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Log Urls" : { | "stderr" : "mystderr", | "stdout" : "mystdout" + | }, + | "Worker" : { + | "url" : "spark://Worker@192.168.1.104:32790", + | "ui_url" : "http://192.168.1.104:46445" | } | } |} From 48ae86f630465ff134da792eeba2007966b5cecf Mon Sep 17 00:00:00 2001 From: Tao Lin Date: Fri, 22 Jul 2016 13:54:14 +0800 Subject: [PATCH 05/12] make the test case more generic --- .../scala/org/apache/spark/util/JsonProtocolSuite.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 3195a298f612..c6c8e635b24b 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -73,8 +73,8 @@ class JsonProtocolSuite extends SparkFunSuite { BlockManagerId("Scarce", "to be counted...", 100)) val unpersistRdd = SparkListenerUnpersistRDD(12345) val logUrlMap = Map("stderr" -> "mystderr", "stdout" -> "mystdout").toMap - val workerUrlMap = Map("url" -> "spark://Worker@192.168.1.104:32790", - "ui_url" -> "http://192.168.1.104:46445").toMap + val workerUrlMap = Map("url" -> "spark://Worker@someHost:8080", + "ui_url" -> "http://someHost:1234").toMap val applicationStart = SparkListenerApplicationStart("The winner of all", Some("appId"), 42L, "Garfield", Some("appAttempt")) val applicationStartWithLogs = SparkListenerApplicationStart("The winner of all", Some("appId"), @@ -1753,8 +1753,8 @@ private[spark] object JsonProtocolSuite extends Assertions { | "stdout" : "mystdout" | }, | "Worker" : { - | "url" : "spark://Worker@192.168.1.104:32790", - | "ui_url" : "http://192.168.1.104:46445" + | "url" : "spark://Worker@someHost:8080", + | "ui_url" : "http://someHost:1234" | } | } |} From 480830646c9b74a3f7a4be1a1e43e0651da5b6bb Mon Sep 17 00:00:00 2001 From: Tao Lin Date: Tue, 26 Jul 2016 19:49:57 +0800 Subject: [PATCH 06/12] Keep the backward compatibility of ExecutorInfo --- .../scheduler/cluster/ExecutorData.scala | 14 ++++++++++- .../org/apache/spark/util/JsonProtocol.scala | 10 ++++++-- .../apache/spark/util/JsonProtocolSuite.scala | 23 +++++++++++++++++++ 3 files changed, 44 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala index d5d9a960e341..18b7f05e01f4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala @@ -36,4 +36,16 @@ private[cluster] class ExecutorData( override val totalCores: Int, override val logUrlMap: Map[String, String], override val workerUrl: Map[String, String] -) extends ExecutorInfo(executorHost, totalCores, logUrlMap, workerUrl) +) extends ExecutorInfo(executorHost, totalCores, logUrlMap, workerUrl) { + + def this( + executorEndpoint: RpcEndpointRef, + executorAddress: RpcAddress, + executorHost: String, + freeCores: Int, + totalCores: Int, + logUrlMap: Map[String, String]) { + this(executorEndpoint, executorAddress, executorHost, freeCores, totalCores, logUrlMap, + Map[String, String]() ) + } +} diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index ab8b732432ce..1fa466079066 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -935,8 +935,14 @@ private[spark] object JsonProtocol { val executorHost = (json \ "Host").extract[String] val totalCores = (json \ "Total Cores").extract[Int] val logUrls = mapFromJson(json \ "Log Urls").toMap - val worker = mapFromJson(json \ "Worker").toMap - new ExecutorInfo(executorHost, totalCores, logUrls, worker) + val workerOption = Utils.jsonOption(json \ "Worker") + workerOption match { + case Some(workerJson) => + val worker = mapFromJson(workerJson).toMap + new ExecutorInfo(executorHost, totalCores, logUrls, worker) + case None => + new ExecutorInfo(executorHost, totalCores, logUrls) + } } /** -------------------------------- * diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index c6c8e635b24b..ee02f7c056b4 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -365,6 +365,16 @@ class JsonProtocolSuite extends SparkFunSuite { assertEquals(expectedDenied, JsonProtocol.taskEndReasonFromJson(oldDenied)) } + test("ExecutorInfo backward compatibility") { + // "workerUrl" property of ExecutorInfo was added in 2.1.0 + val logUrlMap = Map("stderr" -> "mystderr", "stdout" -> "mystdout").toMap + val executorInfo = new ExecutorInfo("Hostee.awesome.com", 11, logUrlMap) + val oldExecutorInfoJson = JsonProtocol.executorInfoToJson(executorInfo) + .removeField({ _._1 == "Worker" }) + val oldExecutorInfo = JsonProtocol.executorInfoFromJson(oldExecutorInfoJson) + assertEquals(executorInfo, oldExecutorInfo) + } + test("AccumulableInfo backward compatibility") { // "Internal" property of AccumulableInfo was added in 1.5.1 val accumulableInfo = makeAccumulableInfo(1, internal = true, countFailedValues = true) @@ -603,6 +613,8 @@ private[spark] object JsonProtocolSuite extends Assertions { private def assertEquals(info1: ExecutorInfo, info2: ExecutorInfo) { assert(info1.executorHost == info2.executorHost) assert(info1.totalCores == info2.totalCores) + assertEquals(info1.logUrlMap, info2.logUrlMap) + assertEquals(info1.workerUrl, info2.workerUrl) } private def assertEquals(metrics1: TaskMetrics, metrics2: TaskMetrics) { @@ -686,6 +698,17 @@ private[spark] object JsonProtocolSuite extends Assertions { } } + case class MapStringToString(m: Map[String, String]) + implicit private def mapStringtoString(m: Map[String, String]): MapStringToString = + MapStringToString(m) + private def assertEquals(details1: MapStringToString, details2: MapStringToString) { + details1.m.zip(details2.m).foreach { + case ((key1, values1), (key2, values2)) => + assert(key1 === key2) + assert(values1 === values2) + } + } + private def assertEquals(exception1: Exception, exception2: Exception) { assert(exception1.getMessage === exception2.getMessage) assertSeqEquals( From 987caa36f49889746f544e6d6d0ad1c94e643d81 Mon Sep 17 00:00:00 2001 From: Tao Lin Date: Sun, 28 Aug 2016 16:32:36 -0400 Subject: [PATCH 07/12] fix conflicts & update to fit #14382 --- .../apache/spark/ui/static/executorspage.js | 19 ++++++++++++------- project/MimaExcludes.scala | 10 ++-------- 2 files changed, 14 insertions(+), 15 deletions(-) diff --git a/core/src/main/resources/org/apache/spark/ui/static/executorspage.js b/core/src/main/resources/org/apache/spark/ui/static/executorspage.js index a5dd012d0a5d..b7b74abf3288 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/executorspage.js +++ b/core/src/main/resources/org/apache/spark/ui/static/executorspage.js @@ -126,17 +126,22 @@ function formatLogsCells(execLogs, type) { return result; } -<<<<<<< HEAD function formatWorkersCells(worker, type) { if ((!worker) || (worker['ui_url'] === undefined)) return ""; if (type !== 'display') return worker['ui_url']; return 'Worker' -======= +} + +function workersExist(execs) { + return execs.some(function(exec) { + return !($.isEmptyObject(exec["worker"])); + }); +} + function logsExist(execs) { return execs.some(function(exec) { return !($.isEmptyObject(exec["executorLogs"])); }); ->>>>>>> master } // Determine Color Opacity from 0.5-1 @@ -415,11 +420,8 @@ $(document).ready(function () { data: 'id', render: function (data, type) { return type === 'display' ? ("Thread Dump" ) : data; } - } -<<<<<<< HEAD }, {data: 'worker', render: formatWorkersCells} -======= ], "columnDefs": [ { @@ -429,8 +431,11 @@ $(document).ready(function () { { "targets": [ 16 ], "visible": getThreadDumpEnabled() + }, + { + "targets": [ 17 ], + "visible": workersExist(response) } ->>>>>>> master ], "order": [[0, "asc"]] }; diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 8606b5d476c2..a0664759a6fe 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -38,19 +38,13 @@ object MimaExcludes { lazy val v21excludes = v20excludes ++ { Seq( // [SPARK-16199][SQL] Add a method to list the referenced columns in data source Filter -<<<<<<< HEAD - ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.sources.Filter.references") - ) ++ - Seq( - // [SPARK-16520] [WEBUI] Link executors to corresponding worker pages - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.cluster.ExecutorInfo.this") -======= ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.sources.Filter.references"), // [SPARK-16853][SQL] Fixes encoder error in DataSet typed select ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.Dataset.select"), // [SPARK-16967] Move Mesos to Module ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.SparkMasterRegex.MESOS_REGEX") ->>>>>>> master + // [SPARK-16520] [WEBUI] Link executors to corresponding worker pages + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.cluster.ExecutorInfo.this") ) } From bed03100f93bfa96d1fa027f9ec9060ca14b7311 Mon Sep 17 00:00:00 2001 From: Tao Lin Date: Mon, 29 Aug 2016 12:57:27 -0400 Subject: [PATCH 08/12] fix mimaexcludes --- project/MimaExcludes.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index a0664759a6fe..b8644d50df47 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -42,7 +42,7 @@ object MimaExcludes { // [SPARK-16853][SQL] Fixes encoder error in DataSet typed select ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.Dataset.select"), // [SPARK-16967] Move Mesos to Module - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.SparkMasterRegex.MESOS_REGEX") + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.SparkMasterRegex.MESOS_REGEX"), // [SPARK-16520] [WEBUI] Link executors to corresponding worker pages ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.cluster.ExecutorInfo.this") ) From 2732db8d3d10990dacd55c02b6f606099555e83c Mon Sep 17 00:00:00 2001 From: Tao Lin Date: Wed, 28 Dec 2016 21:22:06 -0500 Subject: [PATCH 09/12] miss a comma --- project/MimaExcludes.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index c5d7531e2d52..88072e3502d2 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -60,7 +60,7 @@ object MimaExcludes { // [SPARK-16967] Move Mesos to Module ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.SparkMasterRegex.MESOS_REGEX"), // [SPARK-16520] [WEBUI] Link executors to corresponding worker pages - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.cluster.ExecutorInfo.this") + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.cluster.ExecutorInfo.this"), // [SPARK-16240] ML persistence backward compatibility for LDA ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.ml.clustering.LDA$"), // [SPARK-17717] Add Find and Exists method to Catalog. From 11fe868569431c0bc957e08dfb551a7a37a8aaf0 Mon Sep 17 00:00:00 2001 From: Tao Lin Date: Wed, 28 Dec 2016 21:23:50 -0500 Subject: [PATCH 10/12] remove a local file --- b.sh | 1 - 1 file changed, 1 deletion(-) delete mode 100755 b.sh diff --git a/b.sh b/b.sh deleted file mode 100755 index 03c89c36fb0c..000000000000 --- a/b.sh +++ /dev/null @@ -1 +0,0 @@ -./build/mvn -DskipTests package From 3680eb5c8d86e2999660829d2bea91f318a0dd07 Mon Sep 17 00:00:00 2001 From: Tao Lin Date: Tue, 3 Jan 2017 15:05:17 -0500 Subject: [PATCH 11/12] catch changes on refresh --- .../main/resources/org/apache/spark/ui/static/executorspage.js | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/resources/org/apache/spark/ui/static/executorspage.js b/core/src/main/resources/org/apache/spark/ui/static/executorspage.js index 5447f18373d6..12e6603d6699 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/executorspage.js +++ b/core/src/main/resources/org/apache/spark/ui/static/executorspage.js @@ -438,6 +438,7 @@ $(document).ready(function () { var dt = $(selector).DataTable(conf); dt.column(15).visible(logsExist(response)); + dt.column(17).visible(workersExist(response)); $('#active-executors [data-toggle="tooltip"]').tooltip(); var sumSelector = "#summary-execs-table"; From d23643ce79efe98e33e42c23548478d672f5de81 Mon Sep 17 00:00:00 2001 From: Tao Lin Date: Tue, 17 Jan 2017 21:27:04 -0500 Subject: [PATCH 12/12] remove unnecessary code --- .../resources/org/apache/spark/ui/static/executorspage.js | 4 ---- 1 file changed, 4 deletions(-) diff --git a/core/src/main/resources/org/apache/spark/ui/static/executorspage.js b/core/src/main/resources/org/apache/spark/ui/static/executorspage.js index 12e6603d6699..37d612b7d447 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/executorspage.js +++ b/core/src/main/resources/org/apache/spark/ui/static/executorspage.js @@ -427,10 +427,6 @@ $(document).ready(function () { { "targets": [ 16 ], "visible": getThreadDumpEnabled() - }, - { - "targets": [ 17 ], - "visible": workersExist(response) } ], "order": [[0, "asc"]]