diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
index e03977828b86d..4d1fad3770409 100644
--- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
+++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.{ScheduledFuture, TimeUnit}
import scala.collection.mutable
import scala.concurrent.Future
-import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}
import org.apache.spark.scheduler._
import org.apache.spark.storage.BlockManagerId
@@ -35,6 +35,7 @@ import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
*/
private[spark] case class Heartbeat(
executorId: String,
+ executorMetrics: ExecutorMetrics,
taskMetrics: Array[(Long, TaskMetrics)], // taskId -> TaskMetrics
blockManagerId: BlockManagerId)
@@ -119,14 +120,14 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
context.reply(true)
// Messages received from executors
- case heartbeat @ Heartbeat(executorId, taskMetrics, blockManagerId) =>
+ case heartbeat @ Heartbeat(executorId, executorMetrics, taskMetrics, blockManagerId) =>
if (scheduler != null) {
if (executorLastSeen.contains(executorId)) {
executorLastSeen(executorId) = clock.getTimeMillis()
eventLoopThread.submit(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
val unknownExecutor = !scheduler.executorHeartbeatReceived(
- executorId, taskMetrics, blockManagerId)
+ executorId, executorMetrics, taskMetrics, blockManagerId)
val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
context.reply(response)
}
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 9b14184364246..092baa0a44cdd 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -86,6 +86,12 @@ private[spark] class Executor(
env.blockManager.initialize(conf.getAppId)
}
+ private val executorMetrics: ExecutorMetrics = new ExecutorMetrics
+ executorMetrics.setHostname(Utils.localHostName)
+ if (env.rpcEnv.address != null) {
+ executorMetrics.setPort(Some(env.rpcEnv.address.port))
+ }
+
// Whether to load classes in user jars before those in Spark jars
private val userClassPathFirst = conf.getBoolean("spark.executor.userClassPathFirst", false)
@@ -431,7 +437,7 @@ private[spark] class Executor(
metrics.updateAccumulators()
if (isLocal) {
- // JobProgressListener will hold an reference of it during
+ // JobProgressListener will hold a reference of it during
// onExecutorMetricsUpdate(), then JobProgressListener can not see
// the changes of metrics any more, so make a deep copy of it
val copiedMetrics = Utils.deserialize[TaskMetrics](Utils.serialize(metrics))
@@ -444,7 +450,19 @@ private[spark] class Executor(
}
}
- val message = Heartbeat(executorId, tasksMetrics.toArray, env.blockManager.blockManagerId)
+ env.blockTransferService.getMemMetrics(this.executorMetrics)
+ val executorMetrics = if (isLocal) {
+ // JobProgressListener might hold a reference of it during onExecutorMetricsUpdate()
+ // in future, if then JobProgressListener cannot see the changes of metrics any
+ // more, so make a deep copy of it here for future change.
+ Utils.deserialize[ExecutorMetrics](Utils.serialize(this.executorMetrics))
+ } else {
+ this.executorMetrics
+ }
+
+ val message = Heartbeat(
+ executorId, executorMetrics, tasksMetrics.toArray, env.blockManager.blockManagerId)
+
try {
val response = heartbeatReceiverRef.askWithRetry[HeartbeatResponse](
message, RpcTimeout(conf, "spark.executor.heartbeatInterval", "10s"))
diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala
new file mode 100644
index 0000000000000..d63209f52b877
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import org.apache.spark.annotation.DeveloperApi
+
+/**
+ * :: DeveloperApi ::
+ * Metrics tracked during the execution of an executor.
+ *
+ * So, when adding new fields, take into consideration that the whole object can be serialized for
+ * shipping off at any time to consumers of the SparkListener interface.
+ */
+@DeveloperApi
+class ExecutorMetrics extends Serializable {
+
+ /**
+ * Host's name the executor runs on
+ */
+ private var _hostname: String = ""
+ def hostname: String = _hostname
+ private[spark] def setHostname(value: String) = _hostname = value
+
+ /**
+ * Host's port the executor runs on
+ */
+ private var _port: Option[Int] = None
+ def port: Option[Int] = _port
+ private[spark] def setPort(value: Option[Int]) = _port = value
+
+ private[spark] def hostPort: String = {
+ val hp = port match {
+ case None => hostname
+ case value => hostname + ":" + value.get
+ }
+ hp
+ }
+
+ private var _transportMetrics: TransportMetrics = new TransportMetrics
+ def transportMetrics: TransportMetrics = _transportMetrics
+ private[spark] def setTransportMetrics(value: TransportMetrics) = {
+ _transportMetrics = value
+ }
+}
+
+object ExecutorMetrics extends Serializable {
+ def apply(
+ hostName: String,
+ port: Option[Int],
+ transportMetrics: TransportMetrics): ExecutorMetrics = {
+ val execMetrics = new ExecutorMetrics
+ execMetrics.setHostname(hostName)
+ execMetrics.setPort(port)
+ execMetrics.setTransportMetrics(transportMetrics)
+ execMetrics
+ }
+}
+
+/**
+ * :: DeveloperApi ::
+ * Metrics for network layer
+ */
+@DeveloperApi
+class TransportMetrics (
+ val timeStamp: Long = System.currentTimeMillis,
+ val onHeapSize: Long = 0L,
+ val offHeapSize: Long = 0L) extends Serializable
+
+object TransportMetrics extends Serializable {
+ def apply(
+ timeStamp: Long,
+ onHeapSize: Long,
+ offHeapSize: Long): TransportMetrics = {
+ new TransportMetrics(timeStamp, onHeapSize, offHeapSize)
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
index 15d3540f3427b..d7612e14826ce 100644
--- a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
+++ b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer
import scala.concurrent.{Await, Future, Promise}
import scala.concurrent.duration.Duration
+import org.apache.spark.executor.ExecutorMetrics
import org.apache.spark.Logging
import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
import org.apache.spark.network.shuffle.{BlockFetchingListener, ShuffleClient}
@@ -37,6 +38,11 @@ abstract class BlockTransferService extends ShuffleClient with Closeable with Lo
*/
def init(blockDataManager: BlockDataManager)
+ /**
+ * Collect current executor memory metrics of transferService.
+ */
+ private[spark] def getMemMetrics(executorMetrics: ExecutorMetrics): Unit
+
/**
* Tear down the transfer service.
*/
diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
index f588a28eed28d..781481c909d32 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
@@ -22,7 +22,10 @@ import java.nio.ByteBuffer
import scala.collection.JavaConverters._
import scala.concurrent.{Future, Promise}
-import org.apache.spark.{SecurityManager, SparkConf}
+import io.netty.buffer._
+
+import org.apache.spark.{SecurityManager, SparkConf, SparkEnv}
+import org.apache.spark.executor.{ExecutorMetrics, TransportMetrics}
import org.apache.spark.network._
import org.apache.spark.network.buffer.ManagedBuffer
import org.apache.spark.network.client.{RpcResponseCallback, TransportClientBootstrap, TransportClientFactory}
@@ -33,7 +36,7 @@ import org.apache.spark.network.shuffle.protocol.UploadBlock
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.storage.{BlockId, StorageLevel}
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{Clock, SystemClock, Utils}
/**
* A BlockTransferService that uses Netty to fetch a set of blocks at at time.
@@ -50,6 +53,40 @@ class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManage
private[this] var server: TransportServer = _
private[this] var clientFactory: TransportClientFactory = _
private[this] var appId: String = _
+ private[this] var clock: Clock = new SystemClock()
+
+ /**
+ * Use a different clock for this allocation manager. This is mainly used for testing.
+ */
+ def setClock(newClock: Clock): Unit = {
+ clock = newClock
+ }
+
+ private[spark] override def getMemMetrics(executorMetrics: ExecutorMetrics): Unit = {
+ val currentTime = clock.getTimeMillis()
+ val clientPooledAllocator = clientFactory.getPooledAllocator()
+ val serverAllocator = server.getAllocator()
+ val clientOffHeapSize: Long = sumOfMetrics(convToScala(clientPooledAllocator.directArenas()))
+ val clientOnHeapSize: Long = sumOfMetrics(convToScala(clientPooledAllocator.heapArenas()))
+ val serverOffHeapSize: Long = sumOfMetrics(convToScala(serverAllocator.directArenas()))
+ val serverOnHeapSize: Long = sumOfMetrics(convToScala(serverAllocator.heapArenas()))
+ logDebug(s"Current Netty Client offheap size is $clientOffHeapSize, " +
+ s"Client heap size is $clientOnHeapSize, Server offheap size is $serverOffHeapSize, " +
+ s"server heap size is $serverOnHeapSize, executor id is " +
+ s"${SparkEnv.get.blockManager.blockManagerId.executorId}")
+ executorMetrics.setTransportMetrics(TransportMetrics(currentTime,
+ clientOnHeapSize + serverOnHeapSize, clientOffHeapSize + serverOffHeapSize))
+ }
+
+ private def convToScala = (x: java.util.List[PoolArenaMetric]) => x.asScala
+
+ private def sumOfMetrics(arenaMetricList: Seq[PoolArenaMetric]): Long = {
+ arenaMetricList.map { Arena =>
+ Arena.chunkLists().asScala.map { chunk =>
+ chunk.iterator().asScala.map(_.chunkSize()).sum
+ }.sum
+ }.sum
+ }
override def init(blockDataManager: BlockDataManager): Unit = {
val rpcHandler = new NettyBlockRpcServer(conf.getAppId, serializer, blockDataManager)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 6b01a10fc136e..11a98eb3ba7d8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -34,7 +34,7 @@ import org.apache.commons.lang3.SerializationUtils
import org.apache.spark._
import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd.RDD
@@ -216,15 +216,16 @@ class DAGScheduler(
}
/**
- * Update metrics for in-progress tasks and let the master know that the BlockManager is still
- * alive. Return true if the driver knows about the given block manager. Otherwise, return false,
- * indicating that the block manager should re-register.
+ * Update metrics for live executor and in-progress tasks and let the master know that the
+ * BlockManager is still alive. Return true if the driver knows about the given block manager.
+ * Otherwise, return false, indicating that the block manager should re-register.
*/
def executorHeartbeatReceived(
execId: String,
+ executorMetrics: ExecutorMetrics,
taskMetrics: Array[(Long, Int, Int, TaskMetrics)], // (taskId, stageId, stateAttempt, metrics)
blockManagerId: BlockManagerId): Boolean = {
- listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, taskMetrics))
+ listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, executorMetrics, taskMetrics))
blockManagerMaster.driverEndpoint.askWithRetry[Boolean](
BlockManagerHeartbeat(blockManagerId), new RpcTimeout(600 seconds, "BlockManagerHeartbeat"))
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index aa607c5a2df93..cc3d74418a0ae 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -21,7 +21,7 @@ import java.io._
import java.net.URI
import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.{ArrayBuffer, HashMap}
import com.google.common.base.Charsets
import org.apache.hadoop.conf.Configuration
@@ -32,6 +32,7 @@ import org.json4s.jackson.JsonMethods._
import org.apache.spark.{Logging, SPARK_VERSION, SparkConf}
import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.executor.{ExecutorMetrics, TransportMetrics}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.util.{JsonProtocol, Utils}
@@ -85,6 +86,10 @@ private[spark] class EventLoggingListener(
// Visible for tests only.
private[scheduler] val logPath = getLogPath(logBaseDir, appId, appAttemptId, compressionCodecName)
+ private val executorIdToLatestMetrics = new HashMap[String, SparkListenerExecutorMetricsUpdate]
+ private val executorIdToModifiedMaxMetrics = new
+ HashMap[String, SparkListenerExecutorMetricsUpdate]
+
/**
* Creates the log file in the configured log directory.
*/
@@ -146,8 +151,25 @@ private[spark] class EventLoggingListener(
}
}
+ // When a stage is submitted and completed, we updated our executor memory metrics for that
+ // stage, and then log the metrics. Anytime we receive more executor metrics, we update our
+ // running set of {{executorIdToLatestMetrics}} and {{executorIdToModifiedMaxMetrics}}.
+ // Since stages submit and complete time might be interleaved, we maintain the latest and
+ // max metrics for each time segment. So, for each stage start and stage complete, we
+ // replace each item in {{executorIdToModifiedMaxMetrics}} with that
+ // in {{executorIdToLatestMetrics}}.
+ private def updateAndLogExecutorMemoryMetrics() : Unit = {
+ executorIdToModifiedMaxMetrics.foreach { case(_, metrics) => logEvent(metrics) }
+ // Clear the modified metrics map after each log action
+ executorIdToModifiedMaxMetrics.clear()
+ executorIdToLatestMetrics.foreach {case(_, metrics) => logEvent(metrics) }
+ }
+
// Events that do not trigger a flush
- override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = logEvent(event)
+ override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = {
+ updateAndLogExecutorMemoryMetrics()
+ logEvent(event)
+ }
override def onTaskStart(event: SparkListenerTaskStart): Unit = logEvent(event)
@@ -159,6 +181,7 @@ private[spark] class EventLoggingListener(
// Events that trigger a flush
override def onStageCompleted(event: SparkListenerStageCompleted): Unit = {
+ updateAndLogExecutorMemoryMetrics()
logEvent(event, flushLogger = true)
}
@@ -185,19 +208,29 @@ private[spark] class EventLoggingListener(
override def onApplicationEnd(event: SparkListenerApplicationEnd): Unit = {
logEvent(event, flushLogger = true)
}
+
override def onExecutorAdded(event: SparkListenerExecutorAdded): Unit = {
logEvent(event, flushLogger = true)
}
override def onExecutorRemoved(event: SparkListenerExecutorRemoved): Unit = {
+ executorIdToLatestMetrics.remove(event.executorId)
+ executorIdToModifiedMaxMetrics.remove(event.executorId)
logEvent(event, flushLogger = true)
}
// No-op because logging every update would be overkill
override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = {}
- // No-op because logging every update would be overkill
- override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { }
+ // Track executor metrics for logging on stage start and end
+ override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = {
+ // We only track the executor metrics in each stage, so we drop the task metrics as they are
+ // quite verbose
+ val eventWithoutTaskMetrics = SparkListenerExecutorMetricsUpdate(
+ event.execId, event.executorMetrics, Seq.empty)
+ executorIdToLatestMetrics(eventWithoutTaskMetrics.execId) = eventWithoutTaskMetrics
+ updateModifiedMetrics(eventWithoutTaskMetrics)
+ }
override def onOtherEvent(event: SparkListenerEvent): Unit = {
logEvent(event, flushLogger = true)
@@ -224,6 +257,44 @@ private[spark] class EventLoggingListener(
fileSystem.rename(new Path(logPath + IN_PROGRESS), target)
}
+ /**
+ * According to the updated event to modify the maintained event's metrics
+ * @param latestEvent the latest event received that used to update the maintained metric
+ */
+ private def updateModifiedMetrics(latestEvent: SparkListenerExecutorMetricsUpdate): Unit = {
+ val executorId = latestEvent.execId
+ val toBeModifiedEvent = executorIdToModifiedMaxMetrics.get(executorId)
+ toBeModifiedEvent match {
+ case None =>
+ executorIdToModifiedMaxMetrics(executorId) = latestEvent
+ case Some(toBeModifiedEvent) =>
+ val toBeModifiedMetrics = toBeModifiedEvent.executorMetrics.transportMetrics
+ val latestTransMetrics = latestEvent.executorMetrics.transportMetrics
+ val toBeModTransMetrics = toBeModifiedMetrics
+ var timeStamp: Long = toBeModTransMetrics.timeStamp
+ // the logic here should be the same with that for memoryListener
+ val onHeapSize = if (latestTransMetrics.onHeapSize > toBeModTransMetrics.onHeapSize) {
+ timeStamp = latestTransMetrics.timeStamp
+ latestTransMetrics.onHeapSize
+ } else {
+ toBeModTransMetrics.onHeapSize
+ }
+ val offHeapSize = if (latestTransMetrics.offHeapSize > toBeModTransMetrics.offHeapSize) {
+ timeStamp = latestTransMetrics.timeStamp
+ latestTransMetrics.offHeapSize
+ } else {
+ toBeModTransMetrics.offHeapSize
+ }
+
+ // We should maintain a new instance for each update to avoid side-effect
+ val modifiedExecMetrics = ExecutorMetrics(toBeModifiedEvent.executorMetrics.hostname,
+ toBeModifiedEvent.executorMetrics.port,
+ TransportMetrics(timeStamp, onHeapSize, offHeapSize))
+ val modifiedEvent = SparkListenerExecutorMetricsUpdate(
+ toBeModifiedEvent.execId, modifiedExecMetrics, toBeModifiedEvent.taskMetrics)
+ executorIdToModifiedMaxMetrics(executorId) = modifiedEvent
+ }
+ }
}
private[spark] object EventLoggingListener extends Logging {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index 3130a65240a99..345a6b737eea4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -26,7 +26,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo
import org.apache.spark.{Logging, SparkConf, TaskEndReason}
import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.storage.{BlockManagerId, BlockUpdatedInfo}
import org.apache.spark.ui.SparkUI
@@ -108,11 +108,13 @@ case class SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo) extends
/**
* Periodic updates from executors.
* @param execId executor id
+ * @param executorMetrics metrics in executor level
* @param taskMetrics sequence of (task id, stage id, stage attempt, metrics)
*/
@DeveloperApi
case class SparkListenerExecutorMetricsUpdate(
execId: String,
+ executorMetrics: ExecutorMetrics,
taskMetrics: Seq[(Long, Int, Int, TaskMetrics)])
extends SparkListenerEvent
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
index 7c0b007db708e..01d0ed0ddcec0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
@@ -17,7 +17,7 @@
package org.apache.spark.scheduler
-import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.storage.BlockManagerId
@@ -65,8 +65,11 @@ private[spark] trait TaskScheduler {
* alive. Return true if the driver knows about the given block manager. Otherwise, return false,
* indicating that the block manager should re-register.
*/
- def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)],
- blockManagerId: BlockManagerId): Boolean
+ def executorHeartbeatReceived(
+ execId: String,
+ executorMetrics: ExecutorMetrics,
+ taskMetrics: Array[(Long, TaskMetrics)],
+ blockManagerId: BlockManagerId): Boolean
/**
* Get an application ID associated with the job.
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 6e3ef0e54f0fd..c88ebe8ec7668 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -30,7 +30,7 @@ import scala.util.Random
import org.apache.spark._
import org.apache.spark.TaskState.TaskState
-import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.scheduler.TaskLocality.TaskLocality
import org.apache.spark.storage.BlockManagerId
@@ -380,6 +380,7 @@ private[spark] class TaskSchedulerImpl(
*/
override def executorHeartbeatReceived(
execId: String,
+ executorMetrics: ExecutorMetrics,
taskMetrics: Array[(Long, TaskMetrics)], // taskId -> TaskMetrics
blockManagerId: BlockManagerId): Boolean = {
@@ -390,7 +391,8 @@ private[spark] class TaskSchedulerImpl(
}
}
}
- dagScheduler.executorHeartbeatReceived(execId, metricsWithStageIds, blockManagerId)
+ dagScheduler.executorHeartbeatReceived(
+ execId, executorMetrics, metricsWithStageIds, blockManagerId)
}
def handleTaskGettingResult(taskSetManager: TaskSetManager, tid: Long): Unit = synchronized {
diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
index e319937702f23..8730a53065424 100644
--- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
@@ -30,6 +30,7 @@ import org.apache.spark.ui.JettyUtils._
import org.apache.spark.ui.env.{EnvironmentListener, EnvironmentTab}
import org.apache.spark.ui.exec.{ExecutorsListener, ExecutorsTab}
import org.apache.spark.ui.jobs.{JobProgressListener, JobsTab, StagesTab}
+import org.apache.spark.ui.memory.{MemoryListener, MemoryTab}
import org.apache.spark.ui.scope.RDDOperationGraphListener
import org.apache.spark.ui.storage.{StorageListener, StorageTab}
import org.apache.spark.util.Utils
@@ -46,6 +47,7 @@ private[spark] class SparkUI private (
val executorsListener: ExecutorsListener,
val jobProgressListener: JobProgressListener,
val storageListener: StorageListener,
+ val memoryListener: MemoryListener,
val operationGraphListener: RDDOperationGraphListener,
var appName: String,
val basePath: String,
@@ -68,6 +70,7 @@ private[spark] class SparkUI private (
attachTab(new StorageTab(this))
attachTab(new EnvironmentTab(this))
attachTab(new ExecutorsTab(this))
+ attachTab(new MemoryTab(this))
attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
attachHandler(createRedirectHandler("/", "/jobs/", basePath = basePath))
attachHandler(ApiRootResource.getServletHandler(this))
@@ -196,16 +199,18 @@ private[spark] object SparkUI {
val storageStatusListener = new StorageStatusListener
val executorsListener = new ExecutorsListener(storageStatusListener)
val storageListener = new StorageListener(storageStatusListener)
+ val memoryListener = new MemoryListener
val operationGraphListener = new RDDOperationGraphListener(conf)
listenerBus.addListener(environmentListener)
listenerBus.addListener(storageStatusListener)
listenerBus.addListener(executorsListener)
listenerBus.addListener(storageListener)
+ listenerBus.addListener(memoryListener)
listenerBus.addListener(operationGraphListener)
new SparkUI(sc, conf, securityManager, environmentListener, storageStatusListener,
- executorsListener, _jobProgressListener, storageListener, operationGraphListener,
- appName, basePath, startTime)
+ executorsListener, _jobProgressListener, storageListener, memoryListener,
+ operationGraphListener, appName, basePath, startTime)
}
}
diff --git a/core/src/main/scala/org/apache/spark/ui/memory/MemoryPage.scala b/core/src/main/scala/org/apache/spark/ui/memory/MemoryPage.scala
new file mode 100644
index 0000000000000..b674f074fe62e
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/memory/MemoryPage.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.memory
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.{Node, NodeSeq}
+
+import org.apache.spark.ui.{UIUtils, WebUIPage}
+
+private[ui] class MemoryPage(parent: MemoryTab) extends WebUIPage("") {
+ private val memoryListener = parent.memoryListener
+ private val progressListener = parent.progressListener
+
+ def render(request: HttpServletRequest): Seq[Node] = {
+
+ val activeExecutorIdToMem = memoryListener.activeExecutorIdToMem
+ val removedExecutorIdToMem = memoryListener.removedExecutorIdToMem
+ val completedStages = progressListener.completedStages.reverse.toSeq
+ val failedStages = progressListener.failedStages.reverse.toSeq
+ val numberCompletedStages = progressListener.numCompletedStages
+ val numberFailedStages = progressListener.numFailedStages
+ val activeMemInfoSorted = activeExecutorIdToMem.toSeq.sortBy(_._1)
+ val removedMemInfoSorted = removedExecutorIdToMem.toSeq.sortBy(_._1)
+ val shouldShowActiveExecutors = activeExecutorIdToMem.nonEmpty
+ val shouldShowRemovedExecutors = removedExecutorIdToMem.nonEmpty
+ val shouldShowCompletedStages = completedStages.nonEmpty
+ val shouldShowFailedStages = failedStages.nonEmpty
+
+ val activeExecMemTable = new MemTableBase(activeMemInfoSorted, memoryListener)
+ val removedExecMemTable = new MemTableBase(removedMemInfoSorted, memoryListener)
+ val completedStagesTable = new StagesTableBase(
+ completedStages, parent.basePath, progressListener)
+ val failedStagesTable = new StagesTableBase(failedStages, parent.basePath, progressListener)
+
+ val summary: NodeSeq =
+
+
+ {
+ if (shouldShowActiveExecutors) {
+ -
+ Active Executors:
+ {activeExecutorIdToMem.size}
+
+ }
+ }
+ {
+ if (shouldShowRemovedExecutors) {
+ -
+ Removed Executors:
+ {removedExecutorIdToMem.size}
+
+ }
+ }
+ {
+ if (shouldShowCompletedStages) {
+ -
+ Completed Stages:
+ {numberCompletedStages}
+
+ }
+ }
+ {
+ if (shouldShowFailedStages) {
+ -
+ Failed Stages:
+ {numberFailedStages}
+
+ }
+ }
+
+
+
+ var content = summary
+ if (shouldShowActiveExecutors) {
+ content ++= Active Executors ({activeExecutorIdToMem.size})
++
+ activeExecMemTable.toNodeSeq
+ }
+ if (shouldShowRemovedExecutors) {
+ content ++= Removed Executors ({removedMemInfoSorted.size})
++
+ removedExecMemTable.toNodeSeq
+ }
+ if (shouldShowCompletedStages) {
+ content ++= Completed Stages ({numberCompletedStages})
++
+ completedStagesTable.toNodeSeq
+ }
+ if (shouldShowFailedStages) {
+ content ++= Failed Stages ({numberFailedStages})
++
+ failedStagesTable.toNodeSeq
+ }
+
+ UIUtils.headerSparkPage("Memory Usage", content, parent)
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/ui/memory/MemoryTab.scala b/core/src/main/scala/org/apache/spark/ui/memory/MemoryTab.scala
new file mode 100644
index 0000000000000..cc23334860a5a
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/memory/MemoryTab.scala
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.memory
+
+import scala.collection.mutable.HashMap
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.executor.{ExecutorMetrics, TransportMetrics}
+import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.ExecutorInfo
+import org.apache.spark.ui.{SparkUI, SparkUITab}
+
+private[ui] class MemoryTab(parent: SparkUI) extends SparkUITab(parent, "memory") {
+ val memoryListener = parent.memoryListener
+ val progressListener = parent.jobProgressListener
+ attachPage(new MemoryPage(this))
+ attachPage(new StageMemoryPage(this))
+}
+
+/**
+ * :: DeveloperApi ::
+ * A SparkListener that prepares information to be displayed on the MemoryTab
+ */
+@DeveloperApi
+class MemoryListener extends SparkListener {
+ type ExecutorId = String
+ val activeExecutorIdToMem = new HashMap[ExecutorId, MemoryUIInfo]
+ // TODO There might be plenty of removed executors (e.g. Dynamic Allocation Mode), This may use
+ // too much memory.
+ val removedExecutorIdToMem = new HashMap[ExecutorId, MemoryUIInfo]
+ // A map that maintains the latest metrics of each active executor
+ val latestExecIdToExecMetrics = new HashMap[ExecutorId, ExecutorMetrics]
+ // activeStagesToMem a map maintains all executors memory information of each stage,
+ // the Map type is [(stageId, attemptId), Seq[(executorId, MemoryUIInfo)]
+ val activeStagesToMem = new HashMap[(Int, Int), HashMap[ExecutorId, MemoryUIInfo]]
+ // TODO We need to get conf of the retained stages so that we don't need to handle all the
+ // stages since there might be too many completed stages.
+ val completedStagesToMem = new HashMap[(Int, Int), HashMap[ExecutorId, MemoryUIInfo]]
+
+ override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = {
+ val executorId = event.execId
+ val executorMetrics = event.executorMetrics
+ val memoryInfo = activeExecutorIdToMem.getOrElseUpdate(executorId, new MemoryUIInfo)
+ memoryInfo.updateMemUiInfo(executorMetrics)
+ activeStagesToMem.foreach { case (_, stageMemMetrics) =>
+ // If executor is added in the stage running time, we also update the metrics for the
+ // executor in {{activeStagesToMem}}
+ if (!stageMemMetrics.contains(executorId)) {
+ stageMemMetrics(executorId) = new MemoryUIInfo
+ }
+ stageMemMetrics(executorId).updateMemUiInfo(executorMetrics)
+ }
+ latestExecIdToExecMetrics(executorId) = executorMetrics
+ }
+
+ override def onExecutorAdded(event: SparkListenerExecutorAdded): Unit = {
+ val executorId = event.executorId
+ activeExecutorIdToMem.put(executorId, new MemoryUIInfo(event.executorInfo))
+ }
+
+ override def onExecutorRemoved(event: SparkListenerExecutorRemoved): Unit = {
+ val executorId = event.executorId
+ val info = activeExecutorIdToMem.remove(executorId)
+ latestExecIdToExecMetrics.remove(executorId)
+ removedExecutorIdToMem.getOrElseUpdate(executorId, info.getOrElse(new MemoryUIInfo))
+ }
+
+ override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = {
+ val stage = (event.stageInfo.stageId, event.stageInfo.attemptId)
+ val memInfoMap = new HashMap[ExecutorId, MemoryUIInfo]
+ activeExecutorIdToMem.map { case (id, _) =>
+ memInfoMap(id) = new MemoryUIInfo
+ val latestExecMetrics = latestExecIdToExecMetrics.get(id)
+ latestExecMetrics match {
+ case None => // Do nothing
+ case Some(metrics) =>
+ memInfoMap(id).updateMemUiInfo(metrics)
+ }
+ }
+ activeStagesToMem(stage) = memInfoMap
+ }
+
+ override def onStageCompleted(event: SparkListenerStageCompleted): Unit = {
+ val stage = (event.stageInfo.stageId, event.stageInfo.attemptId)
+ // We need to refresh {{activeStagesToMem}} with {{activeExecutorIdToMem}} in case the
+ // executor is added in the stage running time and no {{SparkListenerExecutorMetricsUpdate}}
+ // event is updated in this stage.
+ activeStagesToMem.get(stage).map { memInfoMap =>
+ activeExecutorIdToMem.foreach { case (executorId, memUiInfo) =>
+ if (!memInfoMap.contains(executorId)) {
+ memInfoMap(executorId) = new MemoryUIInfo
+ memInfoMap(executorId).copyMemUiInfo(memUiInfo)
+ }
+ }
+ }
+ completedStagesToMem.put(stage, activeStagesToMem.remove(stage).get)
+ }
+}
+
+@DeveloperApi
+class MemoryUIInfo {
+ var executorAddress: String = _
+ var transportInfo: Option[TransportMemSize] = None
+
+ def this(execInfo: ExecutorInfo) = {
+ this()
+ executorAddress = execInfo.executorHost
+ }
+
+ def updateMemUiInfo(execMetrics: ExecutorMetrics): Unit = {
+ transportInfo = transportInfo match {
+ case Some(transportMemSize) => transportInfo
+ case _ => Some(new TransportMemSize)
+ }
+ executorAddress = execMetrics.hostPort
+ transportInfo.get.updateTransMemSize(execMetrics.transportMetrics)
+ }
+
+ def copyMemUiInfo(memUiInfo: MemoryUIInfo): Unit = {
+ executorAddress = memUiInfo.executorAddress
+ transportInfo.foreach(_.copyTransMemSize(memUiInfo.transportInfo.get))
+ }
+}
+
+@DeveloperApi
+class TransportMemSize {
+ var onHeapSize: Long = _
+ var offHeapSize: Long = _
+ var peakOnHeapSizeTime: MemTime = new MemTime()
+ var peakOffHeapSizeTime: MemTime = new MemTime()
+
+ def updateTransMemSize(transportMetrics: TransportMetrics): Unit = {
+ val updatedOnHeapSize = transportMetrics.onHeapSize
+ val updatedOffHeapSize = transportMetrics.offHeapSize
+ val updateTime: Long = transportMetrics.timeStamp
+ onHeapSize = updatedOnHeapSize
+ offHeapSize = updatedOffHeapSize
+ if (updatedOnHeapSize >= peakOnHeapSizeTime.memorySize) {
+ peakOnHeapSizeTime = MemTime(updatedOnHeapSize, updateTime)
+ }
+ if (updatedOffHeapSize >= peakOffHeapSizeTime.memorySize) {
+ peakOffHeapSizeTime = MemTime(updatedOffHeapSize, updateTime)
+ }
+ }
+
+ def copyTransMemSize(transMemSize: TransportMemSize): Unit = {
+ onHeapSize = transMemSize.onHeapSize
+ offHeapSize = transMemSize.offHeapSize
+ peakOnHeapSizeTime = MemTime(transMemSize.peakOnHeapSizeTime.memorySize,
+ transMemSize.peakOnHeapSizeTime.timeStamp)
+ peakOffHeapSizeTime = MemTime(transMemSize.peakOffHeapSizeTime.memorySize,
+ transMemSize.peakOffHeapSizeTime.timeStamp)
+ }
+}
+
+@DeveloperApi
+case class MemTime(memorySize: Long = 0L, timeStamp: Long = System.currentTimeMillis)
diff --git a/core/src/main/scala/org/apache/spark/ui/memory/MemoryTable.scala b/core/src/main/scala/org/apache/spark/ui/memory/MemoryTable.scala
new file mode 100644
index 0000000000000..11a1f0dc321ef
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/memory/MemoryTable.scala
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.memory
+
+import java.util.Date
+
+import scala.xml.Node
+
+import org.apache.spark.scheduler.StageInfo
+import org.apache.spark.ui.jobs.JobProgressListener
+import org.apache.spark.ui.UIUtils
+import org.apache.spark.util.Utils
+
+
+private[ui] class MemTableBase(
+ memInfos: Seq[(String, MemoryUIInfo)],
+ listener: MemoryListener) {
+
+ protected def columns: Seq[Node] = {
+ Executor ID |
+ Address |
+ Network Memory (on-heap) |
+ Network Memory (off-heap) |
+ Peak Network Memory (on-heap) / Happen Time |
+ Peak Network Read (off-heap) / Happen Time |
+ }
+
+ def toNodeSeq: Seq[Node] = {
+ listener.synchronized {
+ memTable(showRow, memInfos)
+ }
+ }
+
+ protected def memTable[T](makeRow: T => Seq[Node], rows: Seq[T]): Seq[Node] = {
+
+
+ {columns}
+
+
+ {rows.map(r => makeRow(r))}
+
+
+ }
+
+ /** Render an HTML row representing an executor */
+ private def showRow(info: (String, MemoryUIInfo)): Seq[Node] = {
+
+ |
+ {info._1}
+ |
+
+ {info._2.executorAddress}
+ |
+ {if (info._2.transportInfo.isDefined) {
+
+ {Utils.bytesToString(info._2.transportInfo.get.onHeapSize)}
+ |
+
+ {Utils.bytesToString(info._2.transportInfo.get.offHeapSize)}
+ |
+
+ {Utils.bytesToString(info._2.transportInfo.get.peakOnHeapSizeTime.memorySize)}
+ /
+ {UIUtils.formatDate(info._2.transportInfo.get.peakOnHeapSizeTime.timeStamp)}
+ |
+
+ {Utils.bytesToString(info._2.transportInfo.get.peakOffHeapSizeTime.memorySize)}
+ /
+ {UIUtils.formatDate(info._2.transportInfo.get.peakOffHeapSizeTime.timeStamp)}
+ |
+ } else {
+ N/A |
+ N/A |
+ N/A |
+ N/A |
+ }}
+
+ }
+}
+
+private[ui] class StagesTableBase(
+ stageInfos: Seq[StageInfo],
+ basePath: String,
+ listener: JobProgressListener) {
+ protected def columns: Seq[Node] = {
+ Stage Id |
+ Description |
+ Submitted |
+ }
+
+ def toNodeSeq: Seq[Node] = {
+ listener.synchronized {
+ stagesTable(showRow, stageInfos)
+ }
+ }
+
+ protected def stagesTable[T](makeRow: T => Seq[Node], rows: Seq[T]): Seq[Node] = {
+
+
+ {columns}
+
+
+ {rows.map(r => makeRow(r))}
+
+
+ }
+
+ private def showRow(info: StageInfo): Seq[Node] = {
+ val submissionTime = info.submissionTime match {
+ case Some(t) => UIUtils.formatDate(new Date(t))
+ case None => "Unknown"
+ }
+
+
+ | {info.stageId} |
+ {makeDescription(info)} |
+ {submissionTime} |
+
+}
+
+ private def makeDescription(s: StageInfo): Seq[Node] = {
+ val basePathUri = UIUtils.prependBaseUri(basePath)
+ val nameLinkUri = s"$basePathUri/memory/stage?id=${s.stageId}&attempt=${s.attemptId}"
+
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/ui/memory/StageMemoryPage.scala b/core/src/main/scala/org/apache/spark/ui/memory/StageMemoryPage.scala
new file mode 100644
index 0000000000000..b5059dce2a522
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/memory/StageMemoryPage.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.memory
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.Node
+
+import org.apache.spark.ui.{UIUtils, WebUIPage}
+
+/** Page showing memory information for a given stage */
+private[ui] class StageMemoryPage(parent: MemoryTab) extends WebUIPage("stage") {
+ private val memoryListener = parent.memoryListener
+
+ def render(request: HttpServletRequest): Seq[Node] = {
+ memoryListener.synchronized {
+ val parameterId = request.getParameter("id")
+ require(parameterId != null && parameterId.nonEmpty, "Missing id parameter")
+
+ val parameterAttempt = request.getParameter("attempt")
+ require(parameterAttempt != null && parameterAttempt.nonEmpty, "Missing attempt parameter")
+
+ val stage = (parameterId.toInt, parameterAttempt.toInt)
+
+ val finishedStageToMem = memoryListener.completedStagesToMem
+ val content = if (finishedStageToMem.get(stage).isDefined) {
+ val executorIdToMem = finishedStageToMem(stage).toSeq.sortBy(_._1)
+ val execMemTable = new MemTableBase(executorIdToMem, memoryListener)
+ Executors ({executorIdToMem.size})
++
+ execMemTable.toNodeSeq
+ } else {
+ Seq.empty
+ }
+ UIUtils.headerSparkPage("Stage Detail Memory Usage", content, parent)
+ }
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index a62fd2f339285..d685b1902758b 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -234,8 +234,10 @@ private[spark] object JsonProtocol {
def executorMetricsUpdateToJson(metricsUpdate: SparkListenerExecutorMetricsUpdate): JValue = {
val execId = metricsUpdate.execId
val taskMetrics = metricsUpdate.taskMetrics
+ val executorMetrics = metricsUpdate.executorMetrics
("Event" -> Utils.getFormattedClassName(metricsUpdate)) ~
("Executor ID" -> execId) ~
+ ("Executor Metrics Updated" -> executorMetricsToJson(executorMetrics)) ~
("Metrics Updated" -> taskMetrics.map { case (taskId, stageId, stageAttemptId, metrics) =>
("Task ID" -> taskId) ~
("Stage ID" -> stageId) ~
@@ -291,6 +293,19 @@ private[spark] object JsonProtocol {
("Internal" -> accumulableInfo.internal)
}
+ def executorMetricsToJson(executorMetrics: ExecutorMetrics): JValue = {
+ val transportMetrics = transportMetricsToJson(executorMetrics.transportMetrics)
+ ("Executor Hostname" -> executorMetrics.hostname) ~
+ ("Executor Port" -> executorMetrics.port.map(new JInt(_)).getOrElse(JNothing)) ~
+ ("TransportMetrics" -> transportMetrics)
+ }
+
+ def transportMetricsToJson(transportMetrics: TransportMetrics): JValue = {
+ ("TimeStamp" -> transportMetrics.timeStamp) ~
+ ("OnHeapSize" -> transportMetrics.onHeapSize) ~
+ ("OffHeapSize" -> transportMetrics.offHeapSize)
+ }
+
def taskMetricsToJson(taskMetrics: TaskMetrics): JValue = {
val shuffleReadMetrics =
taskMetrics.shuffleReadMetrics.map(shuffleReadMetricsToJson).getOrElse(JNothing)
@@ -632,6 +647,7 @@ private[spark] object JsonProtocol {
def executorMetricsUpdateFromJson(json: JValue): SparkListenerExecutorMetricsUpdate = {
val execInfo = (json \ "Executor ID").extract[String]
+ val executorMetrics = executorMetricsFromJson(json \ "Executor Metrics Updated")
val taskMetrics = (json \ "Metrics Updated").extract[List[JValue]].map { json =>
val taskId = (json \ "Task ID").extract[Long]
val stageId = (json \ "Stage ID").extract[Int]
@@ -639,7 +655,7 @@ private[spark] object JsonProtocol {
val metrics = taskMetricsFromJson(json \ "Task Metrics")
(taskId, stageId, stageAttemptId, metrics)
}
- SparkListenerExecutorMetricsUpdate(execInfo, taskMetrics)
+ SparkListenerExecutorMetricsUpdate(execInfo, executorMetrics, taskMetrics)
}
/** --------------------------------------------------------------------- *
@@ -710,6 +726,25 @@ private[spark] object JsonProtocol {
AccumulableInfo(id, name, update, value, internal)
}
+ def executorMetricsFromJson(json: JValue): ExecutorMetrics = {
+ val metrics = new ExecutorMetrics
+ if (json == JNothing) {
+ return metrics
+ }
+ metrics.setHostname((json \ "Executor Hostname").extract[String])
+ metrics.setPort(Utils.jsonOption(json \ "Executor Port").map(_.extract[Int]))
+ metrics.setTransportMetrics(transportMetricsFromJson(json \ "TransportMetrics"))
+ metrics
+ }
+
+ def transportMetricsFromJson(json: JValue): TransportMetrics = {
+ val metrics = new TransportMetrics(
+ (json \ "TimeStamp").extract[Long],
+ (json \ "OnHeapSize").extract[Long],
+ (json \ "OffHeapSize").extract[Long])
+ metrics
+ }
+
def taskMetricsFromJson(json: JValue): TaskMetrics = {
if (json == JNothing) {
return TaskMetrics.empty
diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
index 18e53508406dc..3fb0ac997c3c7 100644
--- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
@@ -30,7 +30,7 @@ import org.mockito.Matchers._
import org.mockito.Mockito.{mock, spy, verify, when}
import org.scalatest.{BeforeAndAfterEach, PrivateMethodTester}
-import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEndpointRef, RpcEnv}
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
@@ -78,7 +78,7 @@ class HeartbeatReceiverSuite
heartbeatReceiverClock = new ManualClock
heartbeatReceiver = new HeartbeatReceiver(sc, heartbeatReceiverClock)
heartbeatReceiverRef = sc.env.rpcEnv.setupEndpoint("heartbeat", heartbeatReceiver)
- when(scheduler.executorHeartbeatReceived(any(), any(), any())).thenReturn(true)
+ when(scheduler.executorHeartbeatReceived(any(), any(), any(), any())).thenReturn(true)
}
/**
@@ -215,14 +215,15 @@ class HeartbeatReceiverSuite
val metrics = new TaskMetrics
val blockManagerId = BlockManagerId(executorId, "localhost", 12345)
val response = heartbeatReceiverRef.askWithRetry[HeartbeatResponse](
- Heartbeat(executorId, Array(1L -> metrics), blockManagerId))
+ Heartbeat(executorId, null, Array(1L -> metrics), blockManagerId))
if (executorShouldReregister) {
assert(response.reregisterBlockManager)
} else {
assert(!response.reregisterBlockManager)
// Additionally verify that the scheduler callback is called with the correct parameters
verify(scheduler).executorHeartbeatReceived(
- Matchers.eq(executorId), Matchers.eq(Array(1L -> metrics)), Matchers.eq(blockManagerId))
+ Matchers.eq(executorId), Matchers.eq(null), Matchers.eq(
+ Array(1L -> metrics)), Matchers.eq(blockManagerId))
}
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 370a284d2950f..1d4f0840af9ff 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -28,7 +28,7 @@ import org.scalatest.concurrent.Timeouts
import org.scalatest.time.SpanSugar._
import org.apache.spark._
-import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster}
@@ -111,8 +111,8 @@ class DAGSchedulerSuite
override def schedulingMode: SchedulingMode = SchedulingMode.NONE
override def start() = {}
override def stop() = {}
- override def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)],
- blockManagerId: BlockManagerId): Boolean = true
+ override def executorHeartbeatReceived(execId: String, executorMetrics: ExecutorMetrics,
+ taskMetrics: Array[(Long, TaskMetrics)], blockManagerId: BlockManagerId): Boolean = true
override def submitTasks(taskSet: TaskSet) = {
// normally done by TaskSetManager
taskSet.tasks.foreach(_.epoch = mapOutputTracker.getEpoch)
@@ -464,6 +464,7 @@ class DAGSchedulerSuite
override def defaultParallelism(): Int = 2
override def executorHeartbeatReceived(
execId: String,
+ executorMetrics: ExecutorMetrics,
taskMetrics: Array[(Long, TaskMetrics)],
blockManagerId: BlockManagerId): Boolean = true
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index 43da6fc5b5474..8fb955c5ea8f6 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -122,6 +122,135 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
"a fine:mind$dollar{bills}.1", None, Some("lz4")))
}
+ test("test event logger logging executor metrics") {
+ import org.apache.spark.scheduler.cluster._
+ import org.apache.spark.ui.memory._
+ val conf = EventLoggingListenerSuite.getLoggingConf(testDirPath)
+ val eventLogger = new EventLoggingListener("test-memListener", None, testDirPath.toUri(), conf)
+ val execId = "exec-1"
+ val hostName = "host-1"
+ val port: Option[Int] = Some(80)
+
+ eventLogger.start()
+ eventLogger.onExecutorAdded(SparkListenerExecutorAdded(
+ 0L, execId, new ExecutorInfo(hostName, 1, Map.empty)))
+
+ // stage 1 and stage 2 submitted
+ eventLogger.onStageSubmitted(MemoryListenerSuite.createStageStartEvent(1))
+ eventLogger.onStageSubmitted(MemoryListenerSuite.createStageStartEvent(2))
+ val execMetrics1 = MemoryListenerSuite.createExecutorMetrics(hostName, port, 1L, 20, 10)
+ eventLogger.onExecutorMetricsUpdate(MemoryListenerSuite.createExecutorMetricsUpdateEvent(
+ execId, execMetrics1))
+ val execMetrics2 = MemoryListenerSuite.createExecutorMetrics(hostName, port, 2L, 30, 10)
+ eventLogger.onExecutorMetricsUpdate(MemoryListenerSuite.createExecutorMetricsUpdateEvent(
+ execId, execMetrics2))
+ // stage1 completed
+ eventLogger.onStageCompleted(MemoryListenerSuite.createStageEndEvent(1))
+ // stage3 submitted
+ eventLogger.onStageSubmitted(MemoryListenerSuite.createStageStartEvent(3))
+ val execMetrics3 = MemoryListenerSuite.createExecutorMetrics(hostName, port, 3L, 30, 30)
+ eventLogger.onExecutorMetricsUpdate(MemoryListenerSuite.createExecutorMetricsUpdateEvent(
+ execId, execMetrics3))
+ val execMetrics4 = MemoryListenerSuite.createExecutorMetrics(hostName, port, 4L, 20, 25)
+ eventLogger.onExecutorMetricsUpdate(MemoryListenerSuite.createExecutorMetricsUpdateEvent(
+ execId, execMetrics4))
+ // stage 2 completed
+ eventLogger.onStageCompleted(MemoryListenerSuite.createStageEndEvent(2))
+ val execMetrics5 = MemoryListenerSuite.createExecutorMetrics(hostName, port, 5L, 15, 15)
+ eventLogger.onExecutorMetricsUpdate(MemoryListenerSuite.createExecutorMetricsUpdateEvent(
+ execId, execMetrics5))
+ val execMetrics6 = MemoryListenerSuite.createExecutorMetrics(hostName, port, 6L, 25, 10)
+ eventLogger.onExecutorMetricsUpdate(MemoryListenerSuite.createExecutorMetricsUpdateEvent(
+ execId, execMetrics6))
+ // stage 3 completed
+ eventLogger.onStageCompleted(MemoryListenerSuite.createStageEndEvent(3))
+
+ eventLogger.onExecutorRemoved(SparkListenerExecutorRemoved(7L, execId, ""))
+
+ // Totally there are 15 logged events, including:
+ // 2 events of executor Added/Removed
+ // 6 events of stage Submitted/Completed
+ // 7 events of executorMetrics update (3 combined metrics and 4 original metrics)
+ assert(eventLogger.loggedEvents.size === 15)
+ eventLogger.stop()
+
+ val logData = EventLoggingListener.openEventLog(new Path(eventLogger.logPath), fileSystem)
+ val lines = readLines(logData)
+ Utils.tryWithSafeFinally {
+ // totally there are 16 lines, including SparkListenerLogStart event and 15 other events
+ assert(lines.size === 16)
+
+ val listenerBus = new LiveListenerBus
+ val memoryListener = new MemoryListener
+ listenerBus.addListener(memoryListener)
+
+ val sparkEvents: Seq[SparkListenerEvent] = lines.map { line =>
+ val event = JsonProtocol.sparkEventFromJson(parse(line))
+ listenerBus.postToAll(event)
+ event
+ }
+
+ // Make sure there always an original {{SparkListenerExecutorMetricsUpdate}} event updated
+ // before each stage complete.
+ val latestMetricsStage1 = getLatestExecutorMetricsBeforeStageEnd(sparkEvents, 1).get
+ val latestMetricsStage2 = getLatestExecutorMetricsBeforeStageEnd(sparkEvents, 2).get
+ val latestMetricsStage3 = getLatestExecutorMetricsBeforeStageEnd(sparkEvents, 3).get
+ assertMetrics(execId, (hostName, 2L, 30, 10), latestMetricsStage1)
+ assertMetrics(execId, (hostName, 4L, 20, 25), latestMetricsStage2)
+ assertMetrics(execId, (hostName, 6L, 25, 10), latestMetricsStage3)
+
+ // Following is an integration test with [[org.apache.spark.ui.memory.MemoryListener]], make
+ // sure the events logged in history file can work correctly.
+ val mapForStage1 = memoryListener.completedStagesToMem((1, 0))
+ val transMetrics1 = mapForStage1(execId).transportInfo.get
+ MemoryListenerSuite.assertTransMetrics(
+ 30, 10, MemTime(30, 2L), MemTime(10, 2L), transMetrics1)
+ val mapForStage2 = memoryListener.completedStagesToMem((2, 0))
+ val transMetrics2 = mapForStage2(execId).transportInfo.get
+ MemoryListenerSuite.assertTransMetrics(
+ 20, 25, MemTime(30, 3L), MemTime(30, 3L), transMetrics2)
+ val mapForStage3 = memoryListener.completedStagesToMem((3, 0))
+ val transMetrics3 = mapForStage3(execId).transportInfo.get
+ MemoryListenerSuite.assertTransMetrics(
+ 25, 10, MemTime(30, 3L), MemTime(30, 3L), transMetrics3)
+ } {
+ logData.close()
+ }
+
+ def getLatestExecutorMetricsBeforeStageEnd(
+ events: Seq[SparkListenerEvent],
+ stageId: Int): Option[SparkListenerExecutorMetricsUpdate] = {
+ val itr = events.iterator
+ var latestMetrics: Option[SparkListenerExecutorMetricsUpdate] = None
+ var isStageSubmitted: Boolean = false
+ while(itr.hasNext) {
+ val event = itr.next()
+ event match {
+ case ss: SparkListenerStageSubmitted if ss.stageInfo.stageId == stageId =>
+ isStageSubmitted = true
+ case sc: SparkListenerStageCompleted if sc.stageInfo.stageId == stageId =>
+ return latestMetrics
+ case emu: SparkListenerExecutorMetricsUpdate if isStageSubmitted =>
+ latestMetrics = Some(emu)
+ case _ => // Do nothing for other events
+ }
+ }
+ latestMetrics
+ }
+
+ def assertMetrics(
+ execId: String,
+ metricsDetails: (String, Long, Long, Long),
+ event: SparkListenerExecutorMetricsUpdate): Unit = {
+ val execMetrics = event.executorMetrics
+ assert(execId === event.execId)
+ assert(metricsDetails._1 === execMetrics.hostname)
+ assert(metricsDetails._2 === execMetrics.transportMetrics.timeStamp)
+ assert(metricsDetails._3 === execMetrics.transportMetrics.onHeapSize)
+ assert(metricsDetails._4 === execMetrics.transportMetrics.offHeapSize)
+ }
+ }
+
/* ----------------- *
* Actual test logic *
* ----------------- */
@@ -156,7 +285,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
// Verify file contains exactly the two events logged
val logData = EventLoggingListener.openEventLog(new Path(eventLogger.logPath), fileSystem)
- try {
+ Utils.tryWithSafeFinally {
val lines = readLines(logData)
val logStart = SparkListenerLogStart(SPARK_VERSION)
assert(lines.size === 3)
@@ -166,7 +295,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart)
assert(JsonProtocol.sparkEventFromJson(parse(lines(1))) === applicationStart)
assert(JsonProtocol.sparkEventFromJson(parse(lines(2))) === applicationEnd)
- } finally {
+ } {
logData.close()
}
}
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index e02f5a1b20fe3..9d6dc6f2382d5 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -302,7 +302,7 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with
listener.onTaskStart(SparkListenerTaskStart(1, 0, makeTaskInfo(1236L)))
listener.onTaskStart(SparkListenerTaskStart(1, 0, makeTaskInfo(1237L)))
- listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate(execId, Array(
+ listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate(execId, null, Array(
(1234L, 0, 0, makeTaskMetrics(0)),
(1235L, 0, 0, makeTaskMetrics(100)),
(1236L, 1, 0, makeTaskMetrics(200)))))
diff --git a/core/src/test/scala/org/apache/spark/ui/memory/MemoryListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/memory/MemoryListenerSuite.scala
new file mode 100644
index 0000000000000..d6ed2d9493cc2
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/ui/memory/MemoryListenerSuite.scala
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.memory
+
+import org.apache.spark._
+import org.apache.spark.executor._
+import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster._
+
+class MemoryListenerSuite extends SparkFunSuite {
+
+ test("test stages use executor metrics updated in previous stages") {
+ val listener = new MemoryListener
+ val execId1 = "exec-1"
+ val host1 = "host-1"
+ val port: Option[Int] = Some(80)
+
+ listener.onExecutorAdded(
+ SparkListenerExecutorAdded(1L, execId1, new ExecutorInfo(host1, 1, Map.empty)))
+
+ // stage 1, no metrics update
+ listener.onStageSubmitted(MemoryListenerSuite.createStageStartEvent(1))
+ listener.onStageCompleted(MemoryListenerSuite.createStageEndEvent(1))
+
+ // multiple metrics updated in stage 2
+ listener.onStageSubmitted(MemoryListenerSuite.createStageStartEvent(2))
+ val execMetrics1 = MemoryListenerSuite.createExecutorMetrics(host1, port, 2L, 20, 10)
+ listener.onExecutorMetricsUpdate(MemoryListenerSuite.createExecutorMetricsUpdateEvent(
+ execId1, execMetrics1))
+ val execMetrics2 = MemoryListenerSuite.createExecutorMetrics(host1, port, 3L, 30, 5)
+ listener.onExecutorMetricsUpdate(MemoryListenerSuite.createExecutorMetricsUpdateEvent(
+ execId1, execMetrics2))
+ val execMetrics3 = MemoryListenerSuite.createExecutorMetrics(host1, port, 4L, 15, 15)
+ listener.onExecutorMetricsUpdate(MemoryListenerSuite.createExecutorMetricsUpdateEvent(
+ execId1, execMetrics3))
+ listener.onStageCompleted(MemoryListenerSuite.createStageEndEvent(2))
+
+ // stage 3 and stage 4 don't get metrics
+ listener.onStageSubmitted(MemoryListenerSuite.createStageStartEvent(3))
+ listener.onStageCompleted(MemoryListenerSuite.createStageEndEvent(3))
+ listener.onStageSubmitted(MemoryListenerSuite.createStageStartEvent(4))
+ listener.onStageCompleted(MemoryListenerSuite.createStageEndEvent(4))
+
+ // no metrics for stage 1 since no metrics updated for stage 1
+ val mapForStage1 = listener.completedStagesToMem((1, 0))
+ assert(mapForStage1.get(execId1).get.transportInfo === None)
+
+ // metrics is with aggregated value for stage 2 when there are more than one metrics updated
+ val mapForStage2 = listener.completedStagesToMem((2, 0))
+ val transMetrics2 = mapForStage2(execId1).transportInfo.get
+ MemoryListenerSuite.assertTransMetrics(
+ 15, 15, MemTime(30, 3), MemTime(15, 4), transMetrics2)
+
+ // both stage 3 and stage 4 will use the metrics last updated in stage 2
+ val mapForStage3 = listener.completedStagesToMem((3, 0))
+ val memInfo3 = mapForStage3(execId1)
+ assert(memInfo3.transportInfo.isDefined)
+ val transMetrics3 = memInfo3.transportInfo.get
+ MemoryListenerSuite.assertTransMetrics(
+ 15, 15, MemTime(15, 4), MemTime(15, 4), transMetrics3)
+
+ val mapForStage4 = listener.completedStagesToMem((4, 0))
+ val memInfo4 = mapForStage4(execId1)
+ assert(memInfo4.transportInfo.isDefined)
+ val transMetrics4 = memInfo4.transportInfo.get
+ MemoryListenerSuite.assertTransMetrics(
+ 15, 15, MemTime(15, 4), MemTime(15, 4), transMetrics4)
+
+ listener.onExecutorRemoved(SparkListenerExecutorRemoved(0L, execId1, ""))
+ }
+
+ test("test multiple executors with multiple stages") {
+ val listener = new MemoryListener
+ val (execId1, execId2, execId3) = ("exec-1", "exec-2", "exec-3")
+ val (host1, host2, host3) = ("host-1", "host-2", "host-3")
+ val (port1, port2, port3): (Option[Int], Option[Int], Option[Int]) =
+ (Some(80), Some(80), Some(80))
+
+ // two executors added first
+ listener.onExecutorAdded(
+ SparkListenerExecutorAdded(1L, execId1, new ExecutorInfo(host1, 1, Map.empty)))
+ listener.onExecutorAdded(
+ SparkListenerExecutorAdded(2L, execId2, new ExecutorInfo(host2, 1, Map.empty)))
+
+ // three executors running in one stage and one executor is removed before stage complete
+ listener.onStageSubmitted(MemoryListenerSuite.createStageStartEvent(1))
+ val exec1Metrics = MemoryListenerSuite.createExecutorMetrics(host1, port1, 3L, 20, 10)
+ listener.onExecutorMetricsUpdate(MemoryListenerSuite.createExecutorMetricsUpdateEvent(
+ execId1, exec1Metrics))
+ val exec2Metrics = MemoryListenerSuite.createExecutorMetrics(host2, port2, 4L, 15, 5)
+ listener.onExecutorMetricsUpdate(MemoryListenerSuite.createExecutorMetricsUpdateEvent(
+ execId2, exec2Metrics))
+ // one more executor added during the stage is running
+ listener.onExecutorAdded(
+ SparkListenerExecutorAdded(0L, execId3, new ExecutorInfo(host3, 1, Map.empty)))
+ val exec3Metrics = MemoryListenerSuite.createExecutorMetrics(host3, port3, 5L, 30, 15)
+ listener.onExecutorMetricsUpdate(MemoryListenerSuite.createExecutorMetricsUpdateEvent(
+ execId3, exec3Metrics))
+
+ assert(listener.activeExecutorIdToMem.size === 3)
+ assert(listener.removedExecutorIdToMem.isEmpty)
+
+ // executor 2 removed before stage complete
+ listener.onExecutorRemoved(SparkListenerExecutorRemoved(6L, execId2, ""))
+ listener.onStageCompleted(MemoryListenerSuite.createStageEndEvent(1))
+ (2 to 3).foreach { i =>
+ listener.onStageSubmitted(MemoryListenerSuite.createStageStartEvent(i))
+ listener.onStageCompleted(MemoryListenerSuite.createStageEndEvent(i))
+ }
+
+ // stages are all completed, no activeStages now
+ assert(listener.activeStagesToMem.isEmpty)
+
+ listener.onStageSubmitted(MemoryListenerSuite.createStageStartEvent(4))
+ listener.onExecutorMetricsUpdate(MemoryListenerSuite.createExecutorMetricsUpdateEvent(
+ execId2, new ExecutorMetrics))
+
+ assert(listener.activeExecutorIdToMem.size === 3)
+ assert(listener.activeStagesToMem.size === 1)
+
+ listener.onStageCompleted(MemoryListenerSuite.createStageEndEvent(4))
+
+ assert(listener.activeStagesToMem.isEmpty)
+ assert(listener.completedStagesToMem.size === 4)
+ assert(listener.removedExecutorIdToMem.size === 1)
+
+ listener.onExecutorRemoved(SparkListenerExecutorRemoved(7L, execId1, ""))
+ listener.onExecutorRemoved(SparkListenerExecutorRemoved(8L, execId3, ""))
+
+ assert(listener.removedExecutorIdToMem.size === 3)
+
+ // the {{completedStagesToMem}} will maintain the metrics of both the removed executors and
+ // new added executors
+ val mapForStage1 = listener.completedStagesToMem((1, 0))
+ assert(mapForStage1.size === 3)
+
+ val transMetrics1 = mapForStage1(execId1).transportInfo.get
+ val transMetrics2 = mapForStage1(execId2).transportInfo.get
+ val transMetrics3 = mapForStage1(execId3).transportInfo.get
+
+ MemoryListenerSuite.assertTransMetrics(
+ 20, 10, MemTime(20, 3), MemTime(10, 3), transMetrics1)
+ MemoryListenerSuite.assertTransMetrics(
+ 15, 5, MemTime(15, 4), MemTime(5, 4), transMetrics2)
+ MemoryListenerSuite.assertTransMetrics(
+ 30, 15, MemTime(30, 5), MemTime(15, 5), transMetrics3)
+ }
+}
+
+object MemoryListenerSuite extends SparkFunSuite {
+ def createStageStartEvent(stageId: Int): SparkListenerStageSubmitted = {
+ val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, Seq.empty, Seq.empty, "")
+ SparkListenerStageSubmitted(stageInfo)
+ }
+
+ def createStageEndEvent(stageId: Int): SparkListenerStageCompleted = {
+ val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, Seq.empty, Seq.empty, "")
+ SparkListenerStageCompleted(stageInfo)
+ }
+
+ def createExecutorMetricsUpdateEvent(
+ execId: String,
+ executorMetrics: ExecutorMetrics): SparkListenerExecutorMetricsUpdate = {
+ SparkListenerExecutorMetricsUpdate(execId, executorMetrics, Seq.empty)
+ }
+
+ def createExecutorMetrics(
+ hostname: String,
+ port: Option[Int],
+ timeStamp: Long,
+ onHeapSize: Long,
+ offHeapSize: Long): ExecutorMetrics = {
+ ExecutorMetrics(hostname, port, TransportMetrics(timeStamp, onHeapSize, offHeapSize))
+ }
+
+ def assertTransMetrics(
+ onHeapSize: Long,
+ offHeapSize: Long,
+ peakOnHeapSizeTime: MemTime,
+ peakOffHeapSizTime: MemTime,
+ transMemSize: TransportMemSize): Unit = {
+ assert(onHeapSize === transMemSize.onHeapSize)
+ assert(offHeapSize === transMemSize.offHeapSize)
+ assert(peakOnHeapSizeTime === transMemSize.peakOnHeapSizeTime)
+ assert(peakOffHeapSizTime === transMemSize.peakOffHeapSizeTime)
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 6566400e63799..aff63fa6c3dca 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -82,8 +82,15 @@ class JsonProtocolSuite extends SparkFunSuite {
val executorAdded = SparkListenerExecutorAdded(executorAddedTime, "exec1",
new ExecutorInfo("Hostee.awesome.com", 11, logUrlMap))
val executorRemoved = SparkListenerExecutorRemoved(executorRemovedTime, "exec2", "test reason")
- val executorMetricsUpdate = SparkListenerExecutorMetricsUpdate("exec3", Seq(
- (1L, 2, 3, makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800,
+ val executorMetrics = {
+ val execMetrics = new ExecutorMetrics
+ execMetrics.setHostname("host-1")
+ execMetrics.setPort(Some(80))
+ execMetrics.setTransportMetrics(TransportMetrics(0L, 10, 10))
+ execMetrics
+ }
+ val executorMetricsUpdate = SparkListenerExecutorMetricsUpdate("exec3", executorMetrics,
+ Seq((1L, 2, 3, makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800,
hasHadoopInput = true, hasOutput = true))))
testEvent(stageSubmitted, stageSubmittedJsonString)
@@ -379,6 +386,23 @@ class JsonProtocolSuite extends SparkFunSuite {
assert(false === oldInfo.internal)
}
+ test("ExecutorMetrics backward compatibility") {
+ // ExecutorMetrics is newly added
+ val executorMetricsUpdate = SparkListenerExecutorMetricsUpdate("exec3", new ExecutorMetrics,
+ Seq((1L, 2, 3, makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800,
+ hasHadoopInput = true, hasOutput = true))))
+ assert(executorMetricsUpdate.executorMetrics != null)
+ assert(executorMetricsUpdate.executorMetrics.transportMetrics != null)
+ val newJson = JsonProtocol.executorMetricsUpdateToJson(executorMetricsUpdate)
+ val oldJson = newJson.removeField { case (field, _) => field == "Executor Metrics Updated"}
+ val newMetrics = JsonProtocol.executorMetricsUpdateFromJson(oldJson)
+ assert(newMetrics.executorMetrics.hostname === "")
+ assert(newMetrics.executorMetrics.port === None)
+ assert(newMetrics.executorMetrics.transportMetrics.onHeapSize === 0L)
+ assert(newMetrics.executorMetrics.transportMetrics.offHeapSize === 0L)
+ assert(newMetrics.executorMetrics.transportMetrics.timeStamp != 0L)
+ }
+
/** -------------------------- *
| Helper test running methods |
* --------------------------- */
@@ -1692,6 +1716,15 @@ class JsonProtocolSuite extends SparkFunSuite {
|{
| "Event": "SparkListenerExecutorMetricsUpdate",
| "Executor ID": "exec3",
+ | "Executor Metrics Updated": {
+ | "Executor Hostname": "host-1",
+ | "Executor Port": 80,
+ | "TransportMetrics": {
+ | "TimeStamp": 0,
+ | "OnHeapSize": 10,
+ | "OffHeapSize": 10
+ | }
+ | },
| "Metrics Updated": [
| {
| "Task ID": 1,
diff --git a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
index 61bafc8380049..b29ba6e199d50 100644
--- a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
+++ b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
@@ -88,6 +88,10 @@ public ClientPool(int size) {
private EventLoopGroup workerGroup;
private PooledByteBufAllocator pooledAllocator;
+ public PooledByteBufAllocator getPooledAllocator() {
+ return pooledAllocator;
+ }
+
public TransportClientFactory(
TransportContext context,
List clientBootstraps) {
diff --git a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
index baae235e02205..165d2d8a88ecb 100644
--- a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
+++ b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
@@ -53,12 +53,17 @@ public class TransportServer implements Closeable {
private ServerBootstrap bootstrap;
private ChannelFuture channelFuture;
+ private PooledByteBufAllocator allocator;
private int port = -1;
+ public PooledByteBufAllocator getAllocator() {
+ return allocator;
+ }
+
/**
* Creates a TransportServer that binds to the given host and the given port, or to any available
* if 0. If you don't want to bind to any special host, set "hostToBind" to null.
- * */
+ **/
public TransportServer(
TransportContext context,
String hostToBind,
@@ -92,7 +97,7 @@ private void init(String hostToBind, int portToBind) {
NettyUtils.createEventLoop(ioMode, conf.serverThreads(), "shuffle-server");
EventLoopGroup workerGroup = bossGroup;
- PooledByteBufAllocator allocator = NettyUtils.createPooledByteBufAllocator(
+ allocator = NettyUtils.createPooledByteBufAllocator(
conf.preferDirectBufs(), true /* allowCache */, conf.serverThreads());
bootstrap = new ServerBootstrap()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
index eef3c1f3e34d9..208ae4051b2c0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
@@ -113,7 +113,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
assert(listener.getExecutionMetrics(0).isEmpty)
- listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
+ listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", null, Seq(
// (task id, stage id, stage attempt, metrics)
(0L, 0, 0, createTaskMetrics(accumulatorUpdates)),
(1L, 0, 0, createTaskMetrics(accumulatorUpdates))
@@ -121,7 +121,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
- listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
+ listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", null, Seq(
// (task id, stage id, stage attempt, metrics)
(0L, 0, 0, createTaskMetrics(accumulatorUpdates)),
(1L, 0, 0, createTaskMetrics(accumulatorUpdates.mapValues(_ * 2)))
@@ -132,7 +132,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
// Retrying a stage should reset the metrics
listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 1)))
- listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
+ listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", null, Seq(
// (task id, stage id, stage attempt, metrics)
(0L, 0, 1, createTaskMetrics(accumulatorUpdates)),
(1L, 0, 1, createTaskMetrics(accumulatorUpdates))
@@ -172,7 +172,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
// Summit a new stage
listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(1, 0)))
- listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
+ listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", null, Seq(
// (task id, stage id, stage attempt, metrics)
(0L, 1, 0, createTaskMetrics(accumulatorUpdates)),
(1L, 1, 0, createTaskMetrics(accumulatorUpdates))