From 1e51c73d340b331f6fcd033635e4468201ba1477 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Hiram=20Soltren?= Date: Tue, 28 Mar 2017 14:49:38 -0500 Subject: [PATCH 1/8] Netty network layer memory usage on webUI Manual pull of the PR from: https://github.com/apache/spark/pull/7753/commits Created with: git checkout upstream/pr/7753 -- . A bunch of manual merging happened after this. --- .../org/apache/spark/HeartbeatReceiver.scala | 6 +- .../scala/org/apache/spark/SparkEnv.scala | 6 +- .../org/apache/spark/executor/Executor.scala | 20 +- .../spark/executor/ExecutorMetrics.scala | 91 ++++++ .../spark/network/BlockTransferService.scala | 6 + .../netty/NettyBlockTransferService.scala | 41 ++- .../apache/spark/scheduler/DAGScheduler.scala | 11 +- .../scheduler/EventLoggingListener.scala | 79 +++++- .../spark/scheduler/SparkListener.scala | 4 +- .../spark/scheduler/TaskScheduler.scala | 2 + .../spark/scheduler/TaskSchedulerImpl.scala | 5 +- .../scala/org/apache/spark/ui/SparkUI.scala | 9 +- .../apache/spark/ui/memory/MemoryPage.scala | 109 +++++++ .../apache/spark/ui/memory/MemoryTab.scala | 172 +++++++++++ .../apache/spark/ui/memory/MemoryTable.scala | 143 ++++++++++ .../spark/ui/memory/StageMemoryPage.scala | 52 ++++ .../org/apache/spark/util/JsonProtocol.scala | 37 ++- .../apache/spark/HeartbeatReceiverSuite.scala | 7 +- .../spark/scheduler/DAGSchedulerSuite.scala | 3 + .../scheduler/EventLoggingListenerSuite.scala | 133 ++++++++- .../ui/jobs/JobProgressListenerSuite.scala | 2 +- .../spark/ui/memory/MemoryListenerSuite.scala | 202 +++++++++++++ .../apache/spark/util/JsonProtocolSuite.scala | 37 ++- .../client/TransportClientFactory.java | 268 ++++++++++++++++++ .../spark/network/server/TransportServer.java | 156 ++++++++++ .../sql/execution/ui/SQLListenerSuite.scala | 8 +- 26 files changed, 1577 insertions(+), 32 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala create mode 100644 core/src/main/scala/org/apache/spark/ui/memory/MemoryPage.scala create mode 100644 core/src/main/scala/org/apache/spark/ui/memory/MemoryTab.scala create mode 100644 core/src/main/scala/org/apache/spark/ui/memory/MemoryTable.scala create mode 100644 core/src/main/scala/org/apache/spark/ui/memory/StageMemoryPage.scala create mode 100644 core/src/test/scala/org/apache/spark/ui/memory/MemoryListenerSuite.scala create mode 100644 network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java create mode 100644 network/common/src/main/java/org/apache/spark/network/server/TransportServer.java diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index 5242ab6f55235..d0050e988ea79 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -22,6 +22,7 @@ import java.util.concurrent.{ScheduledFuture, TimeUnit} import scala.collection.mutable import scala.concurrent.Future +import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics} import org.apache.spark.internal.Logging import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} import org.apache.spark.scheduler._ @@ -36,6 +37,7 @@ import org.apache.spark.util._ */ private[spark] case class Heartbeat( executorId: String, + executorMetrics: ExecutorMetrics, accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], // taskId -> accumulator updates blockManagerId: BlockManagerId) @@ -120,14 +122,14 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) context.reply(true) // Messages received from executors - case heartbeat @ Heartbeat(executorId, accumUpdates, blockManagerId) => + case heartbeat @ Heartbeat(executorId, executorMetrics, accumUpdates, blockManagerId) => if (scheduler != null) { if (executorLastSeen.contains(executorId)) { executorLastSeen(executorId) = clock.getTimeMillis() eventLoopThread.submit(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { val unknownExecutor = !scheduler.executorHeartbeatReceived( - executorId, accumUpdates, blockManagerId) + executorId, executorMetrics, accumUpdates, blockManagerId) val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor) context.reply(response) } diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 539dbb55eeff0..018a181e0edbd 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -20,11 +20,10 @@ package org.apache.spark import java.io.File import java.net.Socket +import com.google.common.collect.MapMaker import scala.collection.mutable import scala.util.Properties -import com.google.common.collect.MapMaker - import org.apache.spark.annotation.DeveloperApi import org.apache.spark.api.python.PythonWorkerFactory import org.apache.spark.broadcast.BroadcastManager @@ -32,6 +31,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.memory.{MemoryManager, StaticMemoryManager, UnifiedMemoryManager} import org.apache.spark.metrics.MetricsSystem +import org.apache.spark.network.BlockTransferService import org.apache.spark.network.netty.NettyBlockTransferService import org.apache.spark.rpc.{RpcEndpoint, RpcEndpointRef, RpcEnv} import org.apache.spark.scheduler.{LiveListenerBus, OutputCommitCoordinator} @@ -62,6 +62,7 @@ class SparkEnv ( val mapOutputTracker: MapOutputTracker, val shuffleManager: ShuffleManager, val broadcastManager: BroadcastManager, + val blockTransferService: BlockTransferService, val blockManager: BlockManager, val securityManager: SecurityManager, val metricsSystem: MetricsSystem, @@ -381,6 +382,7 @@ object SparkEnv extends Logging { mapOutputTracker, shuffleManager, broadcastManager, + blockTransferService, blockManager, securityManager, metricsSystem, diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 99b1608010ddb..7a2c2fa90ceae 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -102,6 +102,12 @@ private[spark] class Executor( env.blockManager.initialize(conf.getAppId) } + private val executorMetrics: ExecutorMetrics = new ExecutorMetrics + executorMetrics.setHostname(Utils.localHostName) + if (env.rpcEnv.address != null) { + executorMetrics.setPort(Some(env.rpcEnv.address.port)) + } + // Whether to load classes in user jars before those in Spark jars private val userClassPathFirst = conf.getBoolean("spark.executor.userClassPathFirst", false) @@ -704,7 +710,19 @@ private[spark] class Executor( } } - val message = Heartbeat(executorId, accumUpdates.toArray, env.blockManager.blockManagerId) + env.blockTransferService.getMemMetrics(this.executorMetrics) + val executorMetrics = if (isLocal) { + // JobProgressListener might hold a reference of it during onExecutorMetricsUpdate() + // in future, if then JobProgressListener cannot see the changes of metrics any + // more, so make a deep copy of it here for future change. + Utils.deserialize[ExecutorMetrics](Utils.serialize(this.executorMetrics)) + } else { + this.executorMetrics + } + + val message = Heartbeat( + executorId, executorMetrics, accumUpdates.toArray, env.blockManager.blockManagerId) + try { val response = heartbeatReceiverRef.askSync[HeartbeatResponse]( message, RpcTimeout(conf, "spark.executor.heartbeatInterval", "10s")) diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala new file mode 100644 index 0000000000000..d63209f52b877 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import org.apache.spark.annotation.DeveloperApi + +/** + * :: DeveloperApi :: + * Metrics tracked during the execution of an executor. + * + * So, when adding new fields, take into consideration that the whole object can be serialized for + * shipping off at any time to consumers of the SparkListener interface. + */ +@DeveloperApi +class ExecutorMetrics extends Serializable { + + /** + * Host's name the executor runs on + */ + private var _hostname: String = "" + def hostname: String = _hostname + private[spark] def setHostname(value: String) = _hostname = value + + /** + * Host's port the executor runs on + */ + private var _port: Option[Int] = None + def port: Option[Int] = _port + private[spark] def setPort(value: Option[Int]) = _port = value + + private[spark] def hostPort: String = { + val hp = port match { + case None => hostname + case value => hostname + ":" + value.get + } + hp + } + + private var _transportMetrics: TransportMetrics = new TransportMetrics + def transportMetrics: TransportMetrics = _transportMetrics + private[spark] def setTransportMetrics(value: TransportMetrics) = { + _transportMetrics = value + } +} + +object ExecutorMetrics extends Serializable { + def apply( + hostName: String, + port: Option[Int], + transportMetrics: TransportMetrics): ExecutorMetrics = { + val execMetrics = new ExecutorMetrics + execMetrics.setHostname(hostName) + execMetrics.setPort(port) + execMetrics.setTransportMetrics(transportMetrics) + execMetrics + } +} + +/** + * :: DeveloperApi :: + * Metrics for network layer + */ +@DeveloperApi +class TransportMetrics ( + val timeStamp: Long = System.currentTimeMillis, + val onHeapSize: Long = 0L, + val offHeapSize: Long = 0L) extends Serializable + +object TransportMetrics extends Serializable { + def apply( + timeStamp: Long, + onHeapSize: Long, + offHeapSize: Long): TransportMetrics = { + new TransportMetrics(timeStamp, onHeapSize, offHeapSize) + } +} diff --git a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala index cb9d389dd7ea6..ebe4dbc95413a 100644 --- a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala @@ -24,6 +24,7 @@ import scala.concurrent.{Future, Promise} import scala.concurrent.duration.Duration import scala.reflect.ClassTag +import org.apache.spark.executor.ExecutorMetrics import org.apache.spark.internal.Logging import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer} import org.apache.spark.network.shuffle.{BlockFetchingListener, ShuffleClient} @@ -39,6 +40,11 @@ abstract class BlockTransferService extends ShuffleClient with Closeable with Lo */ def init(blockDataManager: BlockDataManager): Unit + /** + * Collect current executor memory metrics of transferService. + */ + private[spark] def getMemMetrics(executorMetrics: ExecutorMetrics): Unit + /** * Tear down the transfer service. */ diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala index b75e91b660969..547e863b69ffd 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala @@ -23,7 +23,10 @@ import scala.collection.JavaConverters._ import scala.concurrent.{Future, Promise} import scala.reflect.ClassTag -import org.apache.spark.{SecurityManager, SparkConf} +import io.netty.buffer._ + +import org.apache.spark.{SecurityManager, SparkConf, SparkEnv} +import org.apache.spark.executor.{ExecutorMetrics, TransportMetrics} import org.apache.spark.network._ import org.apache.spark.network.buffer.ManagedBuffer import org.apache.spark.network.client.{RpcResponseCallback, TransportClientBootstrap, TransportClientFactory} @@ -34,7 +37,7 @@ import org.apache.spark.network.shuffle.protocol.UploadBlock import org.apache.spark.network.util.JavaUtils import org.apache.spark.serializer.JavaSerializer import org.apache.spark.storage.{BlockId, StorageLevel} -import org.apache.spark.util.Utils +import org.apache.spark.util.{Clock, SystemClock, Utils} /** * A BlockTransferService that uses Netty to fetch a set of blocks at time. @@ -57,6 +60,40 @@ private[spark] class NettyBlockTransferService( private[this] var server: TransportServer = _ private[this] var clientFactory: TransportClientFactory = _ private[this] var appId: String = _ + private[this] var clock: Clock = new SystemClock() + + /** + * Use a different clock for this allocation manager. This is mainly used for testing. + */ + def setClock(newClock: Clock): Unit = { + clock = newClock + } + + private[spark] override def getMemMetrics(executorMetrics: ExecutorMetrics): Unit = { + val currentTime = clock.getTimeMillis() + val clientPooledAllocator = clientFactory.getPooledAllocator() + val serverAllocator = server.getAllocator() + val clientOffHeapSize: Long = sumOfMetrics(convToScala(clientPooledAllocator.directArenas())) + val clientOnHeapSize: Long = sumOfMetrics(convToScala(clientPooledAllocator.heapArenas())) + val serverOffHeapSize: Long = sumOfMetrics(convToScala(serverAllocator.directArenas())) + val serverOnHeapSize: Long = sumOfMetrics(convToScala(serverAllocator.heapArenas())) + logDebug(s"Current Netty Client offheap size is $clientOffHeapSize, " + + s"Client heap size is $clientOnHeapSize, Server offheap size is $serverOffHeapSize, " + + s"server heap size is $serverOnHeapSize, executor id is " + + s"${SparkEnv.get.blockManager.blockManagerId.executorId}") + executorMetrics.setTransportMetrics(TransportMetrics(currentTime, + clientOnHeapSize + serverOnHeapSize, clientOffHeapSize + serverOffHeapSize)) + } + + private def convToScala = (x: java.util.List[PoolArenaMetric]) => x.asScala + + private def sumOfMetrics(arenaMetricList: Seq[PoolArenaMetric]): Long = { + arenaMetricList.map { Arena => + Arena.chunkLists().asScala.map { chunk => + chunk.iterator().asScala.map(_.chunkSize()).sum + }.sum + }.sum + } override def init(blockDataManager: BlockDataManager): Unit = { val rpcHandler = new NettyBlockRpcServer(conf.getAppId, serializer, blockDataManager) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 09717316833a7..23357502feb4c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -34,7 +34,7 @@ import org.apache.commons.lang3.SerializationUtils import org.apache.spark._ import org.apache.spark.broadcast.Broadcast -import org.apache.spark.executor.TaskMetrics +import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics} import org.apache.spark.internal.Logging import org.apache.spark.network.util.JavaUtils import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult} @@ -229,16 +229,17 @@ class DAGScheduler( } /** - * Update metrics for in-progress tasks and let the master know that the BlockManager is still - * alive. Return true if the driver knows about the given block manager. Otherwise, return false, - * indicating that the block manager should re-register. + * Update metrics for live executor and in-progress tasks and let the master know that the + * BlockManager is still alive. Return true if the driver knows about the given block manager. + * Otherwise, return false, indicating that the block manager should re-register. */ def executorHeartbeatReceived( execId: String, + executorMetrics: ExecutorMetrics, // (taskId, stageId, stageAttemptId, accumUpdates) accumUpdates: Array[(Long, Int, Int, Seq[AccumulableInfo])], blockManagerId: BlockManagerId): Boolean = { - listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, accumUpdates)) + listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, executorMetrics, accumUpdates)) blockManagerMaster.driverEndpoint.askSync[Boolean]( BlockManagerHeartbeat(blockManagerId), new RpcTimeout(600 seconds, "BlockManagerHeartbeat")) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index af9bdefc967ef..2fe76361bed7b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -22,7 +22,7 @@ import java.net.URI import java.nio.charset.StandardCharsets import scala.collection.mutable -import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.{ArrayBuffer, HashMap} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path} @@ -32,6 +32,7 @@ import org.json4s.jackson.JsonMethods._ import org.apache.spark.{SPARK_VERSION, SparkConf} import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.executor.{ExecutorMetrics, TransportMetrics} import org.apache.spark.internal.Logging import org.apache.spark.io.CompressionCodec import org.apache.spark.util.{JsonProtocol, Utils} @@ -86,6 +87,10 @@ private[spark] class EventLoggingListener( // Visible for tests only. private[scheduler] val logPath = getLogPath(logBaseDir, appId, appAttemptId, compressionCodecName) + private val executorIdToLatestMetrics = new HashMap[String, SparkListenerExecutorMetricsUpdate] + private val executorIdToModifiedMaxMetrics = new + HashMap[String, SparkListenerExecutorMetricsUpdate] + /** * Creates the log file in the configured log directory. */ @@ -144,8 +149,25 @@ private[spark] class EventLoggingListener( } } + // When a stage is submitted and completed, we updated our executor memory metrics for that + // stage, and then log the metrics. Anytime we receive more executor metrics, we update our + // running set of {{executorIdToLatestMetrics}} and {{executorIdToModifiedMaxMetrics}}. + // Since stages submit and complete time might be interleaved, we maintain the latest and + // max metrics for each time segment. So, for each stage start and stage complete, we + // replace each item in {{executorIdToModifiedMaxMetrics}} with that + // in {{executorIdToLatestMetrics}}. + private def updateAndLogExecutorMemoryMetrics() : Unit = { + executorIdToModifiedMaxMetrics.foreach { case(_, metrics) => logEvent(metrics) } + // Clear the modified metrics map after each log action + executorIdToModifiedMaxMetrics.clear() + executorIdToLatestMetrics.foreach {case(_, metrics) => logEvent(metrics) } + } + // Events that do not trigger a flush - override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = logEvent(event) + override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = { + updateAndLogExecutorMemoryMetrics() + logEvent(event) + } override def onTaskStart(event: SparkListenerTaskStart): Unit = logEvent(event) @@ -159,6 +181,7 @@ private[spark] class EventLoggingListener( // Events that trigger a flush override def onStageCompleted(event: SparkListenerStageCompleted): Unit = { + updateAndLogExecutorMemoryMetrics() logEvent(event, flushLogger = true) } @@ -185,11 +208,14 @@ private[spark] class EventLoggingListener( override def onApplicationEnd(event: SparkListenerApplicationEnd): Unit = { logEvent(event, flushLogger = true) } + override def onExecutorAdded(event: SparkListenerExecutorAdded): Unit = { logEvent(event, flushLogger = true) } override def onExecutorRemoved(event: SparkListenerExecutorRemoved): Unit = { + executorIdToLatestMetrics.remove(event.executorId) + executorIdToModifiedMaxMetrics.remove(event.executorId) logEvent(event, flushLogger = true) } @@ -212,8 +238,15 @@ private[spark] class EventLoggingListener( // No-op because logging every update would be overkill override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = {} - // No-op because logging every update would be overkill - override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { } + // Track executor metrics for logging on stage start and end + override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { + // We only track the executor metrics in each stage, so we drop the task metrics as they are + // quite verbose + val eventWithoutTaskMetrics = SparkListenerExecutorMetricsUpdate( + event.execId, event.executorMetrics, Seq.empty) + executorIdToLatestMetrics(eventWithoutTaskMetrics.execId) = eventWithoutTaskMetrics + updateModifiedMetrics(eventWithoutTaskMetrics) + } override def onOtherEvent(event: SparkListenerEvent): Unit = { if (event.logEvent) { @@ -258,6 +291,44 @@ private[spark] class EventLoggingListener( SparkListenerEnvironmentUpdate(redactedEnvironmentDetails) } + /** + * According to the updated event to modify the maintained event's metrics + * @param latestEvent the latest event received that used to update the maintained metric + */ + private def updateModifiedMetrics(latestEvent: SparkListenerExecutorMetricsUpdate): Unit = { + val executorId = latestEvent.execId + val toBeModifiedEvent = executorIdToModifiedMaxMetrics.get(executorId) + toBeModifiedEvent match { + case None => + executorIdToModifiedMaxMetrics(executorId) = latestEvent + case Some(toBeModifiedEvent) => + val toBeModifiedMetrics = toBeModifiedEvent.executorMetrics.transportMetrics + val latestTransMetrics = latestEvent.executorMetrics.transportMetrics + val toBeModTransMetrics = toBeModifiedMetrics + var timeStamp: Long = toBeModTransMetrics.timeStamp + // the logic here should be the same with that for memoryListener + val onHeapSize = if (latestTransMetrics.onHeapSize > toBeModTransMetrics.onHeapSize) { + timeStamp = latestTransMetrics.timeStamp + latestTransMetrics.onHeapSize + } else { + toBeModTransMetrics.onHeapSize + } + val offHeapSize = if (latestTransMetrics.offHeapSize > toBeModTransMetrics.offHeapSize) { + timeStamp = latestTransMetrics.timeStamp + latestTransMetrics.offHeapSize + } else { + toBeModTransMetrics.offHeapSize + } + + // We should maintain a new instance for each update to avoid side-effect + val modifiedExecMetrics = ExecutorMetrics(toBeModifiedEvent.executorMetrics.hostname, + toBeModifiedEvent.executorMetrics.port, + TransportMetrics(timeStamp, onHeapSize, offHeapSize)) + val modifiedEvent = SparkListenerExecutorMetricsUpdate( + toBeModifiedEvent.execId, modifiedExecMetrics, toBeModifiedEvent.accumUpdates) + executorIdToModifiedMaxMetrics(executorId) = modifiedEvent + } + } } private[spark] object EventLoggingListener extends Logging { diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index 4331addb44172..4ead09a27df2e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -26,7 +26,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo import org.apache.spark.{SparkConf, TaskEndReason} import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.executor.TaskMetrics +import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics} import org.apache.spark.scheduler.cluster.ExecutorInfo import org.apache.spark.storage.{BlockManagerId, BlockUpdatedInfo} import org.apache.spark.ui.SparkUI @@ -133,11 +133,13 @@ case class SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo) extends /** * Periodic updates from executors. * @param execId executor id + * @param executorMetrics metrics in executor level * @param accumUpdates sequence of (taskId, stageId, stageAttemptId, accumUpdates) */ @DeveloperApi case class SparkListenerExecutorMetricsUpdate( execId: String, + executorMetrics: ExecutorMetrics, accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])]) extends SparkListenerEvent diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index 3de7d1f7de22b..6cede660ffdb7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -17,6 +17,7 @@ package org.apache.spark.scheduler +import org.apache.spark.executor.ExecutorMetrics import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.storage.BlockManagerId import org.apache.spark.util.AccumulatorV2 @@ -74,6 +75,7 @@ private[spark] trait TaskScheduler { */ def executorHeartbeatReceived( execId: String, + executorMetrics: ExecutorMetrics, accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], blockManagerId: BlockManagerId): Boolean diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 07aea773fa632..82c2f63405e06 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -28,6 +28,7 @@ import scala.util.Random import org.apache.spark._ import org.apache.spark.TaskState.TaskState +import org.apache.spark.executor.ExecutorMetrics import org.apache.spark.internal.Logging import org.apache.spark.internal.config import org.apache.spark.scheduler.SchedulingMode.SchedulingMode @@ -450,6 +451,7 @@ private[spark] class TaskSchedulerImpl private[scheduler]( */ override def executorHeartbeatReceived( execId: String, + executorMetrics: ExecutorMetrics, accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], blockManagerId: BlockManagerId): Boolean = { // (taskId, stageId, stageAttemptId, accumUpdates) @@ -461,7 +463,8 @@ private[spark] class TaskSchedulerImpl private[scheduler]( } } } - dagScheduler.executorHeartbeatReceived(execId, accumUpdatesWithTaskIds, blockManagerId) + dagScheduler.executorHeartbeatReceived( + execId, executorMetrics, accumUpdatesWithTaskIds, blockManagerId) } def handleTaskGettingResult(taskSetManager: TaskSetManager, tid: Long): Unit = synchronized { diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index 7d31ac54a7177..eaf85752c319f 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -31,6 +31,7 @@ import org.apache.spark.ui.JettyUtils._ import org.apache.spark.ui.env.{EnvironmentListener, EnvironmentTab} import org.apache.spark.ui.exec.{ExecutorsListener, ExecutorsTab} import org.apache.spark.ui.jobs.{JobProgressListener, JobsTab, StagesTab} +import org.apache.spark.ui.memory.{MemoryListener, MemoryTab} import org.apache.spark.ui.scope.RDDOperationGraphListener import org.apache.spark.ui.storage.{StorageListener, StorageTab} import org.apache.spark.util.Utils @@ -47,6 +48,7 @@ private[spark] class SparkUI private ( val executorsListener: ExecutorsListener, val jobProgressListener: JobProgressListener, val storageListener: StorageListener, + val memoryListener: MemoryListener, val operationGraphListener: RDDOperationGraphListener, var appName: String, val basePath: String, @@ -71,6 +73,7 @@ private[spark] class SparkUI private ( attachTab(new StorageTab(this)) attachTab(new EnvironmentTab(this)) attachTab(new ExecutorsTab(this)) + attachTab(new MemoryTab(this)) attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static")) attachHandler(createRedirectHandler("/", "/jobs/", basePath = basePath)) attachHandler(ApiRootResource.getServletHandler(this)) @@ -210,16 +213,18 @@ private[spark] object SparkUI { val storageStatusListener = new StorageStatusListener(conf) val executorsListener = new ExecutorsListener(storageStatusListener, conf) val storageListener = new StorageListener(storageStatusListener) + val memoryListener = new MemoryListener val operationGraphListener = new RDDOperationGraphListener(conf) listenerBus.addListener(environmentListener) listenerBus.addListener(storageStatusListener) listenerBus.addListener(executorsListener) listenerBus.addListener(storageListener) + listenerBus.addListener(memoryListener) listenerBus.addListener(operationGraphListener) new SparkUI(sc, conf, securityManager, environmentListener, storageStatusListener, - executorsListener, _jobProgressListener, storageListener, operationGraphListener, - appName, basePath, startTime) + executorsListener, _jobProgressListener, storageListener, memoryListener, + operationGraphListener, appName, basePath, startTime) } } diff --git a/core/src/main/scala/org/apache/spark/ui/memory/MemoryPage.scala b/core/src/main/scala/org/apache/spark/ui/memory/MemoryPage.scala new file mode 100644 index 0000000000000..b674f074fe62e --- /dev/null +++ b/core/src/main/scala/org/apache/spark/ui/memory/MemoryPage.scala @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ui.memory + +import javax.servlet.http.HttpServletRequest + +import scala.xml.{Node, NodeSeq} + +import org.apache.spark.ui.{UIUtils, WebUIPage} + +private[ui] class MemoryPage(parent: MemoryTab) extends WebUIPage("") { + private val memoryListener = parent.memoryListener + private val progressListener = parent.progressListener + + def render(request: HttpServletRequest): Seq[Node] = { + + val activeExecutorIdToMem = memoryListener.activeExecutorIdToMem + val removedExecutorIdToMem = memoryListener.removedExecutorIdToMem + val completedStages = progressListener.completedStages.reverse.toSeq + val failedStages = progressListener.failedStages.reverse.toSeq + val numberCompletedStages = progressListener.numCompletedStages + val numberFailedStages = progressListener.numFailedStages + val activeMemInfoSorted = activeExecutorIdToMem.toSeq.sortBy(_._1) + val removedMemInfoSorted = removedExecutorIdToMem.toSeq.sortBy(_._1) + val shouldShowActiveExecutors = activeExecutorIdToMem.nonEmpty + val shouldShowRemovedExecutors = removedExecutorIdToMem.nonEmpty + val shouldShowCompletedStages = completedStages.nonEmpty + val shouldShowFailedStages = failedStages.nonEmpty + + val activeExecMemTable = new MemTableBase(activeMemInfoSorted, memoryListener) + val removedExecMemTable = new MemTableBase(removedMemInfoSorted, memoryListener) + val completedStagesTable = new StagesTableBase( + completedStages, parent.basePath, progressListener) + val failedStagesTable = new StagesTableBase(failedStages, parent.basePath, progressListener) + + val summary: NodeSeq = +
+ +
+ + var content = summary + if (shouldShowActiveExecutors) { + content ++=

Active Executors ({activeExecutorIdToMem.size})

++ + activeExecMemTable.toNodeSeq + } + if (shouldShowRemovedExecutors) { + content ++=

Removed Executors ({removedMemInfoSorted.size})

++ + removedExecMemTable.toNodeSeq + } + if (shouldShowCompletedStages) { + content ++=

Completed Stages ({numberCompletedStages})

++ + completedStagesTable.toNodeSeq + } + if (shouldShowFailedStages) { + content ++=

Failed Stages ({numberFailedStages})

++ + failedStagesTable.toNodeSeq + } + + UIUtils.headerSparkPage("Memory Usage", content, parent) + } +} diff --git a/core/src/main/scala/org/apache/spark/ui/memory/MemoryTab.scala b/core/src/main/scala/org/apache/spark/ui/memory/MemoryTab.scala new file mode 100644 index 0000000000000..cc23334860a5a --- /dev/null +++ b/core/src/main/scala/org/apache/spark/ui/memory/MemoryTab.scala @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ui.memory + +import scala.collection.mutable.HashMap + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.executor.{ExecutorMetrics, TransportMetrics} +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.ExecutorInfo +import org.apache.spark.ui.{SparkUI, SparkUITab} + +private[ui] class MemoryTab(parent: SparkUI) extends SparkUITab(parent, "memory") { + val memoryListener = parent.memoryListener + val progressListener = parent.jobProgressListener + attachPage(new MemoryPage(this)) + attachPage(new StageMemoryPage(this)) +} + +/** + * :: DeveloperApi :: + * A SparkListener that prepares information to be displayed on the MemoryTab + */ +@DeveloperApi +class MemoryListener extends SparkListener { + type ExecutorId = String + val activeExecutorIdToMem = new HashMap[ExecutorId, MemoryUIInfo] + // TODO There might be plenty of removed executors (e.g. Dynamic Allocation Mode), This may use + // too much memory. + val removedExecutorIdToMem = new HashMap[ExecutorId, MemoryUIInfo] + // A map that maintains the latest metrics of each active executor + val latestExecIdToExecMetrics = new HashMap[ExecutorId, ExecutorMetrics] + // activeStagesToMem a map maintains all executors memory information of each stage, + // the Map type is [(stageId, attemptId), Seq[(executorId, MemoryUIInfo)] + val activeStagesToMem = new HashMap[(Int, Int), HashMap[ExecutorId, MemoryUIInfo]] + // TODO We need to get conf of the retained stages so that we don't need to handle all the + // stages since there might be too many completed stages. + val completedStagesToMem = new HashMap[(Int, Int), HashMap[ExecutorId, MemoryUIInfo]] + + override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { + val executorId = event.execId + val executorMetrics = event.executorMetrics + val memoryInfo = activeExecutorIdToMem.getOrElseUpdate(executorId, new MemoryUIInfo) + memoryInfo.updateMemUiInfo(executorMetrics) + activeStagesToMem.foreach { case (_, stageMemMetrics) => + // If executor is added in the stage running time, we also update the metrics for the + // executor in {{activeStagesToMem}} + if (!stageMemMetrics.contains(executorId)) { + stageMemMetrics(executorId) = new MemoryUIInfo + } + stageMemMetrics(executorId).updateMemUiInfo(executorMetrics) + } + latestExecIdToExecMetrics(executorId) = executorMetrics + } + + override def onExecutorAdded(event: SparkListenerExecutorAdded): Unit = { + val executorId = event.executorId + activeExecutorIdToMem.put(executorId, new MemoryUIInfo(event.executorInfo)) + } + + override def onExecutorRemoved(event: SparkListenerExecutorRemoved): Unit = { + val executorId = event.executorId + val info = activeExecutorIdToMem.remove(executorId) + latestExecIdToExecMetrics.remove(executorId) + removedExecutorIdToMem.getOrElseUpdate(executorId, info.getOrElse(new MemoryUIInfo)) + } + + override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = { + val stage = (event.stageInfo.stageId, event.stageInfo.attemptId) + val memInfoMap = new HashMap[ExecutorId, MemoryUIInfo] + activeExecutorIdToMem.map { case (id, _) => + memInfoMap(id) = new MemoryUIInfo + val latestExecMetrics = latestExecIdToExecMetrics.get(id) + latestExecMetrics match { + case None => // Do nothing + case Some(metrics) => + memInfoMap(id).updateMemUiInfo(metrics) + } + } + activeStagesToMem(stage) = memInfoMap + } + + override def onStageCompleted(event: SparkListenerStageCompleted): Unit = { + val stage = (event.stageInfo.stageId, event.stageInfo.attemptId) + // We need to refresh {{activeStagesToMem}} with {{activeExecutorIdToMem}} in case the + // executor is added in the stage running time and no {{SparkListenerExecutorMetricsUpdate}} + // event is updated in this stage. + activeStagesToMem.get(stage).map { memInfoMap => + activeExecutorIdToMem.foreach { case (executorId, memUiInfo) => + if (!memInfoMap.contains(executorId)) { + memInfoMap(executorId) = new MemoryUIInfo + memInfoMap(executorId).copyMemUiInfo(memUiInfo) + } + } + } + completedStagesToMem.put(stage, activeStagesToMem.remove(stage).get) + } +} + +@DeveloperApi +class MemoryUIInfo { + var executorAddress: String = _ + var transportInfo: Option[TransportMemSize] = None + + def this(execInfo: ExecutorInfo) = { + this() + executorAddress = execInfo.executorHost + } + + def updateMemUiInfo(execMetrics: ExecutorMetrics): Unit = { + transportInfo = transportInfo match { + case Some(transportMemSize) => transportInfo + case _ => Some(new TransportMemSize) + } + executorAddress = execMetrics.hostPort + transportInfo.get.updateTransMemSize(execMetrics.transportMetrics) + } + + def copyMemUiInfo(memUiInfo: MemoryUIInfo): Unit = { + executorAddress = memUiInfo.executorAddress + transportInfo.foreach(_.copyTransMemSize(memUiInfo.transportInfo.get)) + } +} + +@DeveloperApi +class TransportMemSize { + var onHeapSize: Long = _ + var offHeapSize: Long = _ + var peakOnHeapSizeTime: MemTime = new MemTime() + var peakOffHeapSizeTime: MemTime = new MemTime() + + def updateTransMemSize(transportMetrics: TransportMetrics): Unit = { + val updatedOnHeapSize = transportMetrics.onHeapSize + val updatedOffHeapSize = transportMetrics.offHeapSize + val updateTime: Long = transportMetrics.timeStamp + onHeapSize = updatedOnHeapSize + offHeapSize = updatedOffHeapSize + if (updatedOnHeapSize >= peakOnHeapSizeTime.memorySize) { + peakOnHeapSizeTime = MemTime(updatedOnHeapSize, updateTime) + } + if (updatedOffHeapSize >= peakOffHeapSizeTime.memorySize) { + peakOffHeapSizeTime = MemTime(updatedOffHeapSize, updateTime) + } + } + + def copyTransMemSize(transMemSize: TransportMemSize): Unit = { + onHeapSize = transMemSize.onHeapSize + offHeapSize = transMemSize.offHeapSize + peakOnHeapSizeTime = MemTime(transMemSize.peakOnHeapSizeTime.memorySize, + transMemSize.peakOnHeapSizeTime.timeStamp) + peakOffHeapSizeTime = MemTime(transMemSize.peakOffHeapSizeTime.memorySize, + transMemSize.peakOffHeapSizeTime.timeStamp) + } +} + +@DeveloperApi +case class MemTime(memorySize: Long = 0L, timeStamp: Long = System.currentTimeMillis) diff --git a/core/src/main/scala/org/apache/spark/ui/memory/MemoryTable.scala b/core/src/main/scala/org/apache/spark/ui/memory/MemoryTable.scala new file mode 100644 index 0000000000000..11a1f0dc321ef --- /dev/null +++ b/core/src/main/scala/org/apache/spark/ui/memory/MemoryTable.scala @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ui.memory + +import java.util.Date + +import scala.xml.Node + +import org.apache.spark.scheduler.StageInfo +import org.apache.spark.ui.jobs.JobProgressListener +import org.apache.spark.ui.UIUtils +import org.apache.spark.util.Utils + + +private[ui] class MemTableBase( + memInfos: Seq[(String, MemoryUIInfo)], + listener: MemoryListener) { + + protected def columns: Seq[Node] = { + Executor ID + Address + Network Memory (on-heap) + Network Memory (off-heap) + Peak Network Memory (on-heap) / Happen Time + Peak Network Read (off-heap) / Happen Time + } + + def toNodeSeq: Seq[Node] = { + listener.synchronized { + memTable(showRow, memInfos) + } + } + + protected def memTable[T](makeRow: T => Seq[Node], rows: Seq[T]): Seq[Node] = { + + + {columns} + + + {rows.map(r => makeRow(r))} + +
+ } + + /** Render an HTML row representing an executor */ + private def showRow(info: (String, MemoryUIInfo)): Seq[Node] = { + + + {info._1} + + + {info._2.executorAddress} + + {if (info._2.transportInfo.isDefined) { + + {Utils.bytesToString(info._2.transportInfo.get.onHeapSize)} + + + {Utils.bytesToString(info._2.transportInfo.get.offHeapSize)} + + + {Utils.bytesToString(info._2.transportInfo.get.peakOnHeapSizeTime.memorySize)} + / + {UIUtils.formatDate(info._2.transportInfo.get.peakOnHeapSizeTime.timeStamp)} + + + {Utils.bytesToString(info._2.transportInfo.get.peakOffHeapSizeTime.memorySize)} + / + {UIUtils.formatDate(info._2.transportInfo.get.peakOffHeapSizeTime.timeStamp)} + + } else { + N/A + N/A + N/A + N/A + }} + + } +} + +private[ui] class StagesTableBase( + stageInfos: Seq[StageInfo], + basePath: String, + listener: JobProgressListener) { + protected def columns: Seq[Node] = { + Stage Id + Description + Submitted + } + + def toNodeSeq: Seq[Node] = { + listener.synchronized { + stagesTable(showRow, stageInfos) + } + } + + protected def stagesTable[T](makeRow: T => Seq[Node], rows: Seq[T]): Seq[Node] = { + + + {columns} + + + {rows.map(r => makeRow(r))} + +
+ } + + private def showRow(info: StageInfo): Seq[Node] = { + val submissionTime = info.submissionTime match { + case Some(t) => UIUtils.formatDate(new Date(t)) + case None => "Unknown" + } + + + {info.stageId} + {makeDescription(info)} + {submissionTime} + +} + + private def makeDescription(s: StageInfo): Seq[Node] = { + val basePathUri = UIUtils.prependBaseUri(basePath) + val nameLinkUri = s"$basePathUri/memory/stage?id=${s.stageId}&attempt=${s.attemptId}" +
+ {s.name} +
+ } +} diff --git a/core/src/main/scala/org/apache/spark/ui/memory/StageMemoryPage.scala b/core/src/main/scala/org/apache/spark/ui/memory/StageMemoryPage.scala new file mode 100644 index 0000000000000..b5059dce2a522 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/ui/memory/StageMemoryPage.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ui.memory + +import javax.servlet.http.HttpServletRequest + +import scala.xml.Node + +import org.apache.spark.ui.{UIUtils, WebUIPage} + +/** Page showing memory information for a given stage */ +private[ui] class StageMemoryPage(parent: MemoryTab) extends WebUIPage("stage") { + private val memoryListener = parent.memoryListener + + def render(request: HttpServletRequest): Seq[Node] = { + memoryListener.synchronized { + val parameterId = request.getParameter("id") + require(parameterId != null && parameterId.nonEmpty, "Missing id parameter") + + val parameterAttempt = request.getParameter("attempt") + require(parameterAttempt != null && parameterAttempt.nonEmpty, "Missing attempt parameter") + + val stage = (parameterId.toInt, parameterAttempt.toInt) + + val finishedStageToMem = memoryListener.completedStagesToMem + val content = if (finishedStageToMem.get(stage).isDefined) { + val executorIdToMem = finishedStageToMem(stage).toSeq.sortBy(_._1) + val execMemTable = new MemTableBase(executorIdToMem, memoryListener) +

Executors ({executorIdToMem.size})

++ + execMemTable.toNodeSeq + } else { + Seq.empty + } + UIUtils.headerSparkPage("Stage Detail Memory Usage", content, parent) + } + } +} diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 2cb88919c8c83..9d018b6397dd3 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -233,9 +233,11 @@ private[spark] object JsonProtocol { def executorMetricsUpdateToJson(metricsUpdate: SparkListenerExecutorMetricsUpdate): JValue = { val execId = metricsUpdate.execId + val executorMetrics = metricsUpdate.executorMetrics val accumUpdates = metricsUpdate.accumUpdates ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.metricsUpdate) ~ ("Executor ID" -> execId) ~ + ("Executor Metrics Updated" -> executorMetricsToJson(executorMetrics)) ~ ("Metrics Updated" -> accumUpdates.map { case (taskId, stageId, stageAttemptId, updates) => ("Task ID" -> taskId) ~ ("Stage ID" -> stageId) ~ @@ -324,6 +326,19 @@ private[spark] object JsonProtocol { } } + def executorMetricsToJson(executorMetrics: ExecutorMetrics): JValue = { + val transportMetrics = transportMetricsToJson(executorMetrics.transportMetrics) + ("Executor Hostname" -> executorMetrics.hostname) ~ + ("Executor Port" -> executorMetrics.port.map(new JInt(_)).getOrElse(JNothing)) ~ + ("TransportMetrics" -> transportMetrics) + } + + def transportMetricsToJson(transportMetrics: TransportMetrics): JValue = { + ("TimeStamp" -> transportMetrics.timeStamp) ~ + ("OnHeapSize" -> transportMetrics.onHeapSize) ~ + ("OffHeapSize" -> transportMetrics.offHeapSize) + } + def taskMetricsToJson(taskMetrics: TaskMetrics): JValue = { val shuffleReadMetrics: JValue = ("Remote Blocks Fetched" -> taskMetrics.shuffleReadMetrics.remoteBlocksFetched) ~ @@ -653,6 +668,7 @@ private[spark] object JsonProtocol { def executorMetricsUpdateFromJson(json: JValue): SparkListenerExecutorMetricsUpdate = { val execInfo = (json \ "Executor ID").extract[String] + val executorMetrics = executorMetricsFromJson(json \ "Executor Metrics Updated") val accumUpdates = (json \ "Metrics Updated").extract[List[JValue]].map { json => val taskId = (json \ "Task ID").extract[Long] val stageId = (json \ "Stage ID").extract[Int] @@ -661,7 +677,7 @@ private[spark] object JsonProtocol { (json \ "Accumulator Updates").extract[List[JValue]].map(accumulableInfoFromJson) (taskId, stageId, stageAttemptId, updates) } - SparkListenerExecutorMetricsUpdate(execInfo, accumUpdates) + SparkListenerExecutorMetricsUpdate(execInfo, executorMetrics, accumUpdates) } /** --------------------------------------------------------------------- * @@ -766,6 +782,25 @@ private[spark] object JsonProtocol { } } + def executorMetricsFromJson(json: JValue): ExecutorMetrics = { + val metrics = new ExecutorMetrics + if (json == JNothing) { + return metrics + } + metrics.setHostname((json \ "Executor Hostname").extract[String]) + metrics.setPort(Utils.jsonOption(json \ "Executor Port").map(_.extract[Int])) + metrics.setTransportMetrics(transportMetricsFromJson(json \ "TransportMetrics")) + metrics + } + + def transportMetricsFromJson(json: JValue): TransportMetrics = { + val metrics = new TransportMetrics( + (json \ "TimeStamp").extract[Long], + (json \ "OnHeapSize").extract[Long], + (json \ "OffHeapSize").extract[Long]) + metrics + } + def taskMetricsFromJson(json: JValue): TaskMetrics = { val metrics = TaskMetrics.empty if (json == JNothing) { diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala index 88916488c0def..f95d450c5642b 100644 --- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala +++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala @@ -29,7 +29,7 @@ import org.mockito.Matchers._ import org.mockito.Mockito.{mock, spy, verify, when} import org.scalatest.{BeforeAndAfterEach, PrivateMethodTester} -import org.apache.spark.executor.TaskMetrics +import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics} import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEndpointRef, RpcEnv} import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ @@ -77,7 +77,7 @@ class HeartbeatReceiverSuite heartbeatReceiverClock = new ManualClock heartbeatReceiver = new HeartbeatReceiver(sc, heartbeatReceiverClock) heartbeatReceiverRef = sc.env.rpcEnv.setupEndpoint("heartbeat", heartbeatReceiver) - when(scheduler.executorHeartbeatReceived(any(), any(), any())).thenReturn(true) + when(scheduler.executorHeartbeatReceived(any(), any(), any(), any())).thenReturn(true) } /** @@ -214,7 +214,7 @@ class HeartbeatReceiverSuite val metrics = TaskMetrics.empty val blockManagerId = BlockManagerId(executorId, "localhost", 12345) val response = heartbeatReceiverRef.askSync[HeartbeatResponse]( - Heartbeat(executorId, Array(1L -> metrics.accumulators()), blockManagerId)) + Heartbeat(executorId, null, Array(1L -> metrics.accumulators()), blockManagerId)) if (executorShouldReregister) { assert(response.reregisterBlockManager) } else { @@ -222,6 +222,7 @@ class HeartbeatReceiverSuite // Additionally verify that the scheduler callback is called with the correct parameters verify(scheduler).executorHeartbeatReceived( Matchers.eq(executorId), + Matchers.eq(null), Matchers.eq(Array(1L -> metrics.accumulators())), Matchers.eq(blockManagerId)) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index a10941b579fe2..2e07699493b00 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -30,6 +30,7 @@ import org.scalatest.time.SpanSugar._ import org.apache.spark._ import org.apache.spark.broadcast.BroadcastManager +import org.apache.spark.executor.ExecutorMetrics import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.shuffle.{FetchFailedException, MetadataFetchFailedException} @@ -116,6 +117,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou override def stop() = {} override def executorHeartbeatReceived( execId: String, + executorMetrics: ExecutorMetrics, accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], blockManagerId: BlockManagerId): Boolean = true override def submitTasks(taskSet: TaskSet) = { @@ -562,6 +564,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou override def defaultParallelism(): Int = 2 override def executorHeartbeatReceived( execId: String, + executorMetrics: ExecutorMetrics, accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], blockManagerId: BlockManagerId): Boolean = true override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index 4c3d0b102152c..a3af5e10d2d5f 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -136,6 +136,135 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit "a fine:mind$dollar{bills}.1", None, Some("lz4"))) } + test("test event logger logging executor metrics") { + import org.apache.spark.scheduler.cluster._ + import org.apache.spark.ui.memory._ + val conf = EventLoggingListenerSuite.getLoggingConf(testDirPath) + val eventLogger = new EventLoggingListener("test-memListener", None, testDirPath.toUri(), conf) + val execId = "exec-1" + val hostName = "host-1" + val port: Option[Int] = Some(80) + + eventLogger.start() + eventLogger.onExecutorAdded(SparkListenerExecutorAdded( + 0L, execId, new ExecutorInfo(hostName, 1, Map.empty))) + + // stage 1 and stage 2 submitted + eventLogger.onStageSubmitted(MemoryListenerSuite.createStageStartEvent(1)) + eventLogger.onStageSubmitted(MemoryListenerSuite.createStageStartEvent(2)) + val execMetrics1 = MemoryListenerSuite.createExecutorMetrics(hostName, port, 1L, 20, 10) + eventLogger.onExecutorMetricsUpdate(MemoryListenerSuite.createExecutorMetricsUpdateEvent( + execId, execMetrics1)) + val execMetrics2 = MemoryListenerSuite.createExecutorMetrics(hostName, port, 2L, 30, 10) + eventLogger.onExecutorMetricsUpdate(MemoryListenerSuite.createExecutorMetricsUpdateEvent( + execId, execMetrics2)) + // stage1 completed + eventLogger.onStageCompleted(MemoryListenerSuite.createStageEndEvent(1)) + // stage3 submitted + eventLogger.onStageSubmitted(MemoryListenerSuite.createStageStartEvent(3)) + val execMetrics3 = MemoryListenerSuite.createExecutorMetrics(hostName, port, 3L, 30, 30) + eventLogger.onExecutorMetricsUpdate(MemoryListenerSuite.createExecutorMetricsUpdateEvent( + execId, execMetrics3)) + val execMetrics4 = MemoryListenerSuite.createExecutorMetrics(hostName, port, 4L, 20, 25) + eventLogger.onExecutorMetricsUpdate(MemoryListenerSuite.createExecutorMetricsUpdateEvent( + execId, execMetrics4)) + // stage 2 completed + eventLogger.onStageCompleted(MemoryListenerSuite.createStageEndEvent(2)) + val execMetrics5 = MemoryListenerSuite.createExecutorMetrics(hostName, port, 5L, 15, 15) + eventLogger.onExecutorMetricsUpdate(MemoryListenerSuite.createExecutorMetricsUpdateEvent( + execId, execMetrics5)) + val execMetrics6 = MemoryListenerSuite.createExecutorMetrics(hostName, port, 6L, 25, 10) + eventLogger.onExecutorMetricsUpdate(MemoryListenerSuite.createExecutorMetricsUpdateEvent( + execId, execMetrics6)) + // stage 3 completed + eventLogger.onStageCompleted(MemoryListenerSuite.createStageEndEvent(3)) + + eventLogger.onExecutorRemoved(SparkListenerExecutorRemoved(7L, execId, "")) + + // Totally there are 15 logged events, including: + // 2 events of executor Added/Removed + // 6 events of stage Submitted/Completed + // 7 events of executorMetrics update (3 combined metrics and 4 original metrics) + assert(eventLogger.loggedEvents.size === 15) + eventLogger.stop() + + val logData = EventLoggingListener.openEventLog(new Path(eventLogger.logPath), fileSystem) + val lines = readLines(logData) + Utils.tryWithSafeFinally { + // totally there are 16 lines, including SparkListenerLogStart event and 15 other events + assert(lines.size === 16) + + val listenerBus = new LiveListenerBus + val memoryListener = new MemoryListener + listenerBus.addListener(memoryListener) + + val sparkEvents: Seq[SparkListenerEvent] = lines.map { line => + val event = JsonProtocol.sparkEventFromJson(parse(line)) + listenerBus.postToAll(event) + event + } + + // Make sure there always an original {{SparkListenerExecutorMetricsUpdate}} event updated + // before each stage complete. + val latestMetricsStage1 = getLatestExecutorMetricsBeforeStageEnd(sparkEvents, 1).get + val latestMetricsStage2 = getLatestExecutorMetricsBeforeStageEnd(sparkEvents, 2).get + val latestMetricsStage3 = getLatestExecutorMetricsBeforeStageEnd(sparkEvents, 3).get + assertMetrics(execId, (hostName, 2L, 30, 10), latestMetricsStage1) + assertMetrics(execId, (hostName, 4L, 20, 25), latestMetricsStage2) + assertMetrics(execId, (hostName, 6L, 25, 10), latestMetricsStage3) + + // Following is an integration test with [[org.apache.spark.ui.memory.MemoryListener]], make + // sure the events logged in history file can work correctly. + val mapForStage1 = memoryListener.completedStagesToMem((1, 0)) + val transMetrics1 = mapForStage1(execId).transportInfo.get + MemoryListenerSuite.assertTransMetrics( + 30, 10, MemTime(30, 2L), MemTime(10, 2L), transMetrics1) + val mapForStage2 = memoryListener.completedStagesToMem((2, 0)) + val transMetrics2 = mapForStage2(execId).transportInfo.get + MemoryListenerSuite.assertTransMetrics( + 20, 25, MemTime(30, 3L), MemTime(30, 3L), transMetrics2) + val mapForStage3 = memoryListener.completedStagesToMem((3, 0)) + val transMetrics3 = mapForStage3(execId).transportInfo.get + MemoryListenerSuite.assertTransMetrics( + 25, 10, MemTime(30, 3L), MemTime(30, 3L), transMetrics3) + } { + logData.close() + } + + def getLatestExecutorMetricsBeforeStageEnd( + events: Seq[SparkListenerEvent], + stageId: Int): Option[SparkListenerExecutorMetricsUpdate] = { + val itr = events.iterator + var latestMetrics: Option[SparkListenerExecutorMetricsUpdate] = None + var isStageSubmitted: Boolean = false + while(itr.hasNext) { + val event = itr.next() + event match { + case ss: SparkListenerStageSubmitted if ss.stageInfo.stageId == stageId => + isStageSubmitted = true + case sc: SparkListenerStageCompleted if sc.stageInfo.stageId == stageId => + return latestMetrics + case emu: SparkListenerExecutorMetricsUpdate if isStageSubmitted => + latestMetrics = Some(emu) + case _ => // Do nothing for other events + } + } + latestMetrics + } + + def assertMetrics( + execId: String, + metricsDetails: (String, Long, Long, Long), + event: SparkListenerExecutorMetricsUpdate): Unit = { + val execMetrics = event.executorMetrics + assert(execId === event.execId) + assert(metricsDetails._1 === execMetrics.hostname) + assert(metricsDetails._2 === execMetrics.transportMetrics.timeStamp) + assert(metricsDetails._3 === execMetrics.transportMetrics.onHeapSize) + assert(metricsDetails._4 === execMetrics.transportMetrics.offHeapSize) + } + } + /* ----------------- * * Actual test logic * * ----------------- */ @@ -170,7 +299,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit // Verify file contains exactly the two events logged val logData = EventLoggingListener.openEventLog(new Path(eventLogger.logPath), fileSystem) - try { + Utils.tryWithSafeFinally { val lines = readLines(logData) val logStart = SparkListenerLogStart(SPARK_VERSION) assert(lines.size === 3) @@ -180,7 +309,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart) assert(JsonProtocol.sparkEventFromJson(parse(lines(1))) === applicationStart) assert(JsonProtocol.sparkEventFromJson(parse(lines(2))) === applicationEnd) - } finally { + } { logData.close() } } diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index 93964a2d56743..d3625a02ae7cf 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -323,7 +323,7 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with listener.onTaskStart(SparkListenerTaskStart(1, 0, makeTaskInfo(1236L))) listener.onTaskStart(SparkListenerTaskStart(1, 0, makeTaskInfo(1237L))) - listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate(execId, Array( + listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate(execId, null, Array( (1234L, 0, 0, makeTaskMetrics(0).accumulators().map(AccumulatorSuite.makeInfo)), (1235L, 0, 0, makeTaskMetrics(100).accumulators().map(AccumulatorSuite.makeInfo)), (1236L, 1, 0, makeTaskMetrics(200).accumulators().map(AccumulatorSuite.makeInfo))))) diff --git a/core/src/test/scala/org/apache/spark/ui/memory/MemoryListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/memory/MemoryListenerSuite.scala new file mode 100644 index 0000000000000..d6ed2d9493cc2 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/ui/memory/MemoryListenerSuite.scala @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ui.memory + +import org.apache.spark._ +import org.apache.spark.executor._ +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster._ + +class MemoryListenerSuite extends SparkFunSuite { + + test("test stages use executor metrics updated in previous stages") { + val listener = new MemoryListener + val execId1 = "exec-1" + val host1 = "host-1" + val port: Option[Int] = Some(80) + + listener.onExecutorAdded( + SparkListenerExecutorAdded(1L, execId1, new ExecutorInfo(host1, 1, Map.empty))) + + // stage 1, no metrics update + listener.onStageSubmitted(MemoryListenerSuite.createStageStartEvent(1)) + listener.onStageCompleted(MemoryListenerSuite.createStageEndEvent(1)) + + // multiple metrics updated in stage 2 + listener.onStageSubmitted(MemoryListenerSuite.createStageStartEvent(2)) + val execMetrics1 = MemoryListenerSuite.createExecutorMetrics(host1, port, 2L, 20, 10) + listener.onExecutorMetricsUpdate(MemoryListenerSuite.createExecutorMetricsUpdateEvent( + execId1, execMetrics1)) + val execMetrics2 = MemoryListenerSuite.createExecutorMetrics(host1, port, 3L, 30, 5) + listener.onExecutorMetricsUpdate(MemoryListenerSuite.createExecutorMetricsUpdateEvent( + execId1, execMetrics2)) + val execMetrics3 = MemoryListenerSuite.createExecutorMetrics(host1, port, 4L, 15, 15) + listener.onExecutorMetricsUpdate(MemoryListenerSuite.createExecutorMetricsUpdateEvent( + execId1, execMetrics3)) + listener.onStageCompleted(MemoryListenerSuite.createStageEndEvent(2)) + + // stage 3 and stage 4 don't get metrics + listener.onStageSubmitted(MemoryListenerSuite.createStageStartEvent(3)) + listener.onStageCompleted(MemoryListenerSuite.createStageEndEvent(3)) + listener.onStageSubmitted(MemoryListenerSuite.createStageStartEvent(4)) + listener.onStageCompleted(MemoryListenerSuite.createStageEndEvent(4)) + + // no metrics for stage 1 since no metrics updated for stage 1 + val mapForStage1 = listener.completedStagesToMem((1, 0)) + assert(mapForStage1.get(execId1).get.transportInfo === None) + + // metrics is with aggregated value for stage 2 when there are more than one metrics updated + val mapForStage2 = listener.completedStagesToMem((2, 0)) + val transMetrics2 = mapForStage2(execId1).transportInfo.get + MemoryListenerSuite.assertTransMetrics( + 15, 15, MemTime(30, 3), MemTime(15, 4), transMetrics2) + + // both stage 3 and stage 4 will use the metrics last updated in stage 2 + val mapForStage3 = listener.completedStagesToMem((3, 0)) + val memInfo3 = mapForStage3(execId1) + assert(memInfo3.transportInfo.isDefined) + val transMetrics3 = memInfo3.transportInfo.get + MemoryListenerSuite.assertTransMetrics( + 15, 15, MemTime(15, 4), MemTime(15, 4), transMetrics3) + + val mapForStage4 = listener.completedStagesToMem((4, 0)) + val memInfo4 = mapForStage4(execId1) + assert(memInfo4.transportInfo.isDefined) + val transMetrics4 = memInfo4.transportInfo.get + MemoryListenerSuite.assertTransMetrics( + 15, 15, MemTime(15, 4), MemTime(15, 4), transMetrics4) + + listener.onExecutorRemoved(SparkListenerExecutorRemoved(0L, execId1, "")) + } + + test("test multiple executors with multiple stages") { + val listener = new MemoryListener + val (execId1, execId2, execId3) = ("exec-1", "exec-2", "exec-3") + val (host1, host2, host3) = ("host-1", "host-2", "host-3") + val (port1, port2, port3): (Option[Int], Option[Int], Option[Int]) = + (Some(80), Some(80), Some(80)) + + // two executors added first + listener.onExecutorAdded( + SparkListenerExecutorAdded(1L, execId1, new ExecutorInfo(host1, 1, Map.empty))) + listener.onExecutorAdded( + SparkListenerExecutorAdded(2L, execId2, new ExecutorInfo(host2, 1, Map.empty))) + + // three executors running in one stage and one executor is removed before stage complete + listener.onStageSubmitted(MemoryListenerSuite.createStageStartEvent(1)) + val exec1Metrics = MemoryListenerSuite.createExecutorMetrics(host1, port1, 3L, 20, 10) + listener.onExecutorMetricsUpdate(MemoryListenerSuite.createExecutorMetricsUpdateEvent( + execId1, exec1Metrics)) + val exec2Metrics = MemoryListenerSuite.createExecutorMetrics(host2, port2, 4L, 15, 5) + listener.onExecutorMetricsUpdate(MemoryListenerSuite.createExecutorMetricsUpdateEvent( + execId2, exec2Metrics)) + // one more executor added during the stage is running + listener.onExecutorAdded( + SparkListenerExecutorAdded(0L, execId3, new ExecutorInfo(host3, 1, Map.empty))) + val exec3Metrics = MemoryListenerSuite.createExecutorMetrics(host3, port3, 5L, 30, 15) + listener.onExecutorMetricsUpdate(MemoryListenerSuite.createExecutorMetricsUpdateEvent( + execId3, exec3Metrics)) + + assert(listener.activeExecutorIdToMem.size === 3) + assert(listener.removedExecutorIdToMem.isEmpty) + + // executor 2 removed before stage complete + listener.onExecutorRemoved(SparkListenerExecutorRemoved(6L, execId2, "")) + listener.onStageCompleted(MemoryListenerSuite.createStageEndEvent(1)) + (2 to 3).foreach { i => + listener.onStageSubmitted(MemoryListenerSuite.createStageStartEvent(i)) + listener.onStageCompleted(MemoryListenerSuite.createStageEndEvent(i)) + } + + // stages are all completed, no activeStages now + assert(listener.activeStagesToMem.isEmpty) + + listener.onStageSubmitted(MemoryListenerSuite.createStageStartEvent(4)) + listener.onExecutorMetricsUpdate(MemoryListenerSuite.createExecutorMetricsUpdateEvent( + execId2, new ExecutorMetrics)) + + assert(listener.activeExecutorIdToMem.size === 3) + assert(listener.activeStagesToMem.size === 1) + + listener.onStageCompleted(MemoryListenerSuite.createStageEndEvent(4)) + + assert(listener.activeStagesToMem.isEmpty) + assert(listener.completedStagesToMem.size === 4) + assert(listener.removedExecutorIdToMem.size === 1) + + listener.onExecutorRemoved(SparkListenerExecutorRemoved(7L, execId1, "")) + listener.onExecutorRemoved(SparkListenerExecutorRemoved(8L, execId3, "")) + + assert(listener.removedExecutorIdToMem.size === 3) + + // the {{completedStagesToMem}} will maintain the metrics of both the removed executors and + // new added executors + val mapForStage1 = listener.completedStagesToMem((1, 0)) + assert(mapForStage1.size === 3) + + val transMetrics1 = mapForStage1(execId1).transportInfo.get + val transMetrics2 = mapForStage1(execId2).transportInfo.get + val transMetrics3 = mapForStage1(execId3).transportInfo.get + + MemoryListenerSuite.assertTransMetrics( + 20, 10, MemTime(20, 3), MemTime(10, 3), transMetrics1) + MemoryListenerSuite.assertTransMetrics( + 15, 5, MemTime(15, 4), MemTime(5, 4), transMetrics2) + MemoryListenerSuite.assertTransMetrics( + 30, 15, MemTime(30, 5), MemTime(15, 5), transMetrics3) + } +} + +object MemoryListenerSuite extends SparkFunSuite { + def createStageStartEvent(stageId: Int): SparkListenerStageSubmitted = { + val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, Seq.empty, Seq.empty, "") + SparkListenerStageSubmitted(stageInfo) + } + + def createStageEndEvent(stageId: Int): SparkListenerStageCompleted = { + val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, Seq.empty, Seq.empty, "") + SparkListenerStageCompleted(stageInfo) + } + + def createExecutorMetricsUpdateEvent( + execId: String, + executorMetrics: ExecutorMetrics): SparkListenerExecutorMetricsUpdate = { + SparkListenerExecutorMetricsUpdate(execId, executorMetrics, Seq.empty) + } + + def createExecutorMetrics( + hostname: String, + port: Option[Int], + timeStamp: Long, + onHeapSize: Long, + offHeapSize: Long): ExecutorMetrics = { + ExecutorMetrics(hostname, port, TransportMetrics(timeStamp, onHeapSize, offHeapSize)) + } + + def assertTransMetrics( + onHeapSize: Long, + offHeapSize: Long, + peakOnHeapSizeTime: MemTime, + peakOffHeapSizTime: MemTime, + transMemSize: TransportMemSize): Unit = { + assert(onHeapSize === transMemSize.onHeapSize) + assert(offHeapSize === transMemSize.offHeapSize) + assert(peakOnHeapSizeTime === transMemSize.peakOnHeapSizeTime) + assert(peakOffHeapSizTime === transMemSize.peakOffHeapSizeTime) + } +} diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index a64dbeae47294..bbee4cb9b2acc 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -82,6 +82,13 @@ class JsonProtocolSuite extends SparkFunSuite { val executorAdded = SparkListenerExecutorAdded(executorAddedTime, "exec1", new ExecutorInfo("Hostee.awesome.com", 11, logUrlMap)) val executorRemoved = SparkListenerExecutorRemoved(executorRemovedTime, "exec2", "test reason") + val executorMetrics = { + val execMetrics = new ExecutorMetrics + execMetrics.setHostname("host-1") + execMetrics.setPort(Some(80)) + execMetrics.setTransportMetrics(TransportMetrics(0L, 10, 10)) + execMetrics + } val executorBlacklisted = SparkListenerExecutorBlacklisted(executorBlacklistedTime, "exec1", 22) val executorUnblacklisted = SparkListenerExecutorUnblacklisted(executorUnblacklistedTime, "exec1") @@ -94,7 +101,7 @@ class JsonProtocolSuite extends SparkFunSuite { makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHadoopInput = true, hasOutput = true) .accumulators().map(AccumulatorSuite.makeInfo) .zipWithIndex.map { case (a, i) => a.copy(id = i) } - SparkListenerExecutorMetricsUpdate("exec3", Seq((1L, 2, 3, accumUpdates))) + SparkListenerExecutorMetricsUpdate("exec3", null, Seq((1L, 2, 3, accumUpdates))) } testEvent(stageSubmitted, stageSubmittedJsonString) @@ -432,6 +439,25 @@ class JsonProtocolSuite extends SparkFunSuite { testAccumValue(Some("anything"), 123, JString("123")) } + test("ExecutorMetrics backward compatibility") { + // ExecutorMetrics is newly added + val accumUpdates = + makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHadoopInput = true, hasOutput = true) + .accumulators().map(AccumulatorSuite.makeInfo) + .zipWithIndex.map { case (a, i) => a.copy(id = i) } + val executorMetricsUpdate = SparkListenerExecutorMetricsUpdate("exec3", new ExecutorMetrics, + Seq((1L, 2, 3, accumUpdates))) + assert(executorMetricsUpdate.executorMetrics != null) + assert(executorMetricsUpdate.executorMetrics.transportMetrics != null) + val newJson = JsonProtocol.executorMetricsUpdateToJson(executorMetricsUpdate) + val oldJson = newJson.removeField { case (field, _) => field == "Executor Metrics Updated"} + val newMetrics = JsonProtocol.executorMetricsUpdateFromJson(oldJson) + assert(newMetrics.executorMetrics.hostname === "") + assert(newMetrics.executorMetrics.port === None) + assert(newMetrics.executorMetrics.transportMetrics.onHeapSize === 0L) + assert(newMetrics.executorMetrics.transportMetrics.offHeapSize === 0L) + assert(newMetrics.executorMetrics.transportMetrics.timeStamp != 0L) + } } @@ -1794,6 +1820,15 @@ private[spark] object JsonProtocolSuite extends Assertions { |{ | "Event": "SparkListenerExecutorMetricsUpdate", | "Executor ID": "exec3", + | "Executor Metrics Updated": { + | "Executor Hostname": "host-1", + | "Executor Port": 80, + | "TransportMetrics": { + | "TimeStamp": 0, + | "OnHeapSize": 10, + | "OffHeapSize": 10 + | } + | }, | "Metrics Updated": [ | { | "Task ID": 1, diff --git a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java new file mode 100644 index 0000000000000..b29ba6e199d50 --- /dev/null +++ b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.client; + +import java.io.Closeable; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; + +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.collect.Lists; +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.spark.network.TransportContext; +import org.apache.spark.network.server.TransportChannelHandler; +import org.apache.spark.network.util.IOMode; +import org.apache.spark.network.util.JavaUtils; +import org.apache.spark.network.util.NettyUtils; +import org.apache.spark.network.util.TransportConf; + +/** + * Factory for creating {@link TransportClient}s by using createClient. + * + * The factory maintains a connection pool to other hosts and should return the same + * TransportClient for the same remote host. It also shares a single worker thread pool for + * all TransportClients. + * + * TransportClients will be reused whenever possible. Prior to completing the creation of a new + * TransportClient, all given {@link TransportClientBootstrap}s will be run. + */ +public class TransportClientFactory implements Closeable { + + /** A simple data structure to track the pool of clients between two peer nodes. */ + private static class ClientPool { + TransportClient[] clients; + Object[] locks; + + public ClientPool(int size) { + clients = new TransportClient[size]; + locks = new Object[size]; + for (int i = 0; i < size; i++) { + locks[i] = new Object(); + } + } + } + + private final Logger logger = LoggerFactory.getLogger(TransportClientFactory.class); + + private final TransportContext context; + private final TransportConf conf; + private final List clientBootstraps; + private final ConcurrentHashMap connectionPool; + + /** Random number generator for picking connections between peers. */ + private final Random rand; + private final int numConnectionsPerPeer; + + private final Class socketChannelClass; + private EventLoopGroup workerGroup; + private PooledByteBufAllocator pooledAllocator; + + public PooledByteBufAllocator getPooledAllocator() { + return pooledAllocator; + } + + public TransportClientFactory( + TransportContext context, + List clientBootstraps) { + this.context = Preconditions.checkNotNull(context); + this.conf = context.getConf(); + this.clientBootstraps = Lists.newArrayList(Preconditions.checkNotNull(clientBootstraps)); + this.connectionPool = new ConcurrentHashMap(); + this.numConnectionsPerPeer = conf.numConnectionsPerPeer(); + this.rand = new Random(); + + IOMode ioMode = IOMode.valueOf(conf.ioMode()); + this.socketChannelClass = NettyUtils.getClientChannelClass(ioMode); + // TODO: Make thread pool name configurable. + this.workerGroup = NettyUtils.createEventLoop(ioMode, conf.clientThreads(), "shuffle-client"); + this.pooledAllocator = NettyUtils.createPooledByteBufAllocator( + conf.preferDirectBufs(), false /* allowCache */, conf.clientThreads()); + } + + /** + * Create a {@link TransportClient} connecting to the given remote host / port. + * + * We maintains an array of clients (size determined by spark.shuffle.io.numConnectionsPerPeer) + * and randomly picks one to use. If no client was previously created in the randomly selected + * spot, this function creates a new client and places it there. + * + * Prior to the creation of a new TransportClient, we will execute all + * {@link TransportClientBootstrap}s that are registered with this factory. + * + * This blocks until a connection is successfully established and fully bootstrapped. + * + * Concurrency: This method is safe to call from multiple threads. + */ + public TransportClient createClient(String remoteHost, int remotePort) throws IOException { + // Get connection from the connection pool first. + // If it is not found or not active, create a new one. + final InetSocketAddress address = new InetSocketAddress(remoteHost, remotePort); + + // Create the ClientPool if we don't have it yet. + ClientPool clientPool = connectionPool.get(address); + if (clientPool == null) { + connectionPool.putIfAbsent(address, new ClientPool(numConnectionsPerPeer)); + clientPool = connectionPool.get(address); + } + + int clientIndex = rand.nextInt(numConnectionsPerPeer); + TransportClient cachedClient = clientPool.clients[clientIndex]; + + if (cachedClient != null && cachedClient.isActive()) { + // Make sure that the channel will not timeout by updating the last use time of the + // handler. Then check that the client is still alive, in case it timed out before + // this code was able to update things. + TransportChannelHandler handler = cachedClient.getChannel().pipeline() + .get(TransportChannelHandler.class); + synchronized (handler) { + handler.getResponseHandler().updateTimeOfLastRequest(); + } + + if (cachedClient.isActive()) { + logger.trace("Returning cached connection to {}: {}", address, cachedClient); + return cachedClient; + } + } + + // If we reach here, we don't have an existing connection open. Let's create a new one. + // Multiple threads might race here to create new connections. Keep only one of them active. + synchronized (clientPool.locks[clientIndex]) { + cachedClient = clientPool.clients[clientIndex]; + + if (cachedClient != null) { + if (cachedClient.isActive()) { + logger.trace("Returning cached connection to {}: {}", address, cachedClient); + return cachedClient; + } else { + logger.info("Found inactive connection to {}, creating a new one.", address); + } + } + clientPool.clients[clientIndex] = createClient(address); + return clientPool.clients[clientIndex]; + } + } + + /** + * Create a completely new {@link TransportClient} to the given remote host / port. + * This connection is not pooled. + * + * As with {@link #createClient(String, int)}, this method is blocking. + */ + public TransportClient createUnmanagedClient(String remoteHost, int remotePort) + throws IOException { + final InetSocketAddress address = new InetSocketAddress(remoteHost, remotePort); + return createClient(address); + } + + /** Create a completely new {@link TransportClient} to the remote address. */ + private TransportClient createClient(InetSocketAddress address) throws IOException { + logger.debug("Creating new connection to " + address); + + Bootstrap bootstrap = new Bootstrap(); + bootstrap.group(workerGroup) + .channel(socketChannelClass) + // Disable Nagle's Algorithm since we don't want packets to wait + .option(ChannelOption.TCP_NODELAY, true) + .option(ChannelOption.SO_KEEPALIVE, true) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectionTimeoutMs()) + .option(ChannelOption.ALLOCATOR, pooledAllocator); + + final AtomicReference clientRef = new AtomicReference(); + final AtomicReference channelRef = new AtomicReference(); + + bootstrap.handler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) { + TransportChannelHandler clientHandler = context.initializePipeline(ch); + clientRef.set(clientHandler.getClient()); + channelRef.set(ch); + } + }); + + // Connect to the remote server + long preConnect = System.nanoTime(); + ChannelFuture cf = bootstrap.connect(address); + if (!cf.awaitUninterruptibly(conf.connectionTimeoutMs())) { + throw new IOException( + String.format("Connecting to %s timed out (%s ms)", address, conf.connectionTimeoutMs())); + } else if (cf.cause() != null) { + throw new IOException(String.format("Failed to connect to %s", address), cf.cause()); + } + + TransportClient client = clientRef.get(); + Channel channel = channelRef.get(); + assert client != null : "Channel future completed successfully with null client"; + + // Execute any client bootstraps synchronously before marking the Client as successful. + long preBootstrap = System.nanoTime(); + logger.debug("Connection to {} successful, running bootstraps...", address); + try { + for (TransportClientBootstrap clientBootstrap : clientBootstraps) { + clientBootstrap.doBootstrap(client, channel); + } + } catch (Exception e) { // catch non-RuntimeExceptions too as bootstrap may be written in Scala + long bootstrapTimeMs = (System.nanoTime() - preBootstrap) / 1000000; + logger.error("Exception while bootstrapping client after " + bootstrapTimeMs + " ms", e); + client.close(); + throw Throwables.propagate(e); + } + long postBootstrap = System.nanoTime(); + + logger.debug("Successfully created connection to {} after {} ms ({} ms spent in bootstraps)", + address, (postBootstrap - preConnect) / 1000000, (postBootstrap - preBootstrap) / 1000000); + + return client; + } + + /** Close all connections in the connection pool, and shutdown the worker thread pool. */ + @Override + public void close() { + // Go through all clients and close them if they are active. + for (ClientPool clientPool : connectionPool.values()) { + for (int i = 0; i < clientPool.clients.length; i++) { + TransportClient client = clientPool.clients[i]; + if (client != null) { + clientPool.clients[i] = null; + JavaUtils.closeQuietly(client); + } + } + } + connectionPool.clear(); + + if (workerGroup != null) { + workerGroup.shutdownGracefully(); + workerGroup = null; + } + } +} diff --git a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java new file mode 100644 index 0000000000000..165d2d8a88ecb --- /dev/null +++ b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.server; + +import java.io.Closeable; +import java.net.InetSocketAddress; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import org.apache.spark.network.util.JavaUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.spark.network.TransportContext; +import org.apache.spark.network.util.IOMode; +import org.apache.spark.network.util.NettyUtils; +import org.apache.spark.network.util.TransportConf; + +/** + * Server for the efficient, low-level streaming service. + */ +public class TransportServer implements Closeable { + private final Logger logger = LoggerFactory.getLogger(TransportServer.class); + + private final TransportContext context; + private final TransportConf conf; + private final RpcHandler appRpcHandler; + private final List bootstraps; + + private ServerBootstrap bootstrap; + private ChannelFuture channelFuture; + private PooledByteBufAllocator allocator; + private int port = -1; + + public PooledByteBufAllocator getAllocator() { + return allocator; + } + + /** + * Creates a TransportServer that binds to the given host and the given port, or to any available + * if 0. If you don't want to bind to any special host, set "hostToBind" to null. + **/ + public TransportServer( + TransportContext context, + String hostToBind, + int portToBind, + RpcHandler appRpcHandler, + List bootstraps) { + this.context = context; + this.conf = context.getConf(); + this.appRpcHandler = appRpcHandler; + this.bootstraps = Lists.newArrayList(Preconditions.checkNotNull(bootstraps)); + + try { + init(hostToBind, portToBind); + } catch (RuntimeException e) { + JavaUtils.closeQuietly(this); + throw e; + } + } + + public int getPort() { + if (port == -1) { + throw new IllegalStateException("Server not initialized"); + } + return port; + } + + private void init(String hostToBind, int portToBind) { + + IOMode ioMode = IOMode.valueOf(conf.ioMode()); + EventLoopGroup bossGroup = + NettyUtils.createEventLoop(ioMode, conf.serverThreads(), "shuffle-server"); + EventLoopGroup workerGroup = bossGroup; + + allocator = NettyUtils.createPooledByteBufAllocator( + conf.preferDirectBufs(), true /* allowCache */, conf.serverThreads()); + + bootstrap = new ServerBootstrap() + .group(bossGroup, workerGroup) + .channel(NettyUtils.getServerChannelClass(ioMode)) + .option(ChannelOption.ALLOCATOR, allocator) + .childOption(ChannelOption.ALLOCATOR, allocator); + + if (conf.backLog() > 0) { + bootstrap.option(ChannelOption.SO_BACKLOG, conf.backLog()); + } + + if (conf.receiveBuf() > 0) { + bootstrap.childOption(ChannelOption.SO_RCVBUF, conf.receiveBuf()); + } + + if (conf.sendBuf() > 0) { + bootstrap.childOption(ChannelOption.SO_SNDBUF, conf.sendBuf()); + } + + bootstrap.childHandler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) throws Exception { + RpcHandler rpcHandler = appRpcHandler; + for (TransportServerBootstrap bootstrap : bootstraps) { + rpcHandler = bootstrap.doBootstrap(ch, rpcHandler); + } + context.initializePipeline(ch, rpcHandler); + } + }); + + InetSocketAddress address = hostToBind == null ? + new InetSocketAddress(portToBind): new InetSocketAddress(hostToBind, portToBind); + channelFuture = bootstrap.bind(address); + channelFuture.syncUninterruptibly(); + + port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort(); + logger.debug("Shuffle server started on port :" + port); + } + + @Override + public void close() { + if (channelFuture != null) { + // close is a local operation and should finish within milliseconds; timeout just to be safe + channelFuture.channel().close().awaitUninterruptibly(10, TimeUnit.SECONDS); + channelFuture = null; + } + if (bootstrap != null && bootstrap.group() != null) { + bootstrap.group().shutdownGracefully(); + } + if (bootstrap != null && bootstrap.childGroup() != null) { + bootstrap.childGroup().shutdownGracefully(); + } + bootstrap = null; + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala index e41c00ecec271..21396c305ea70 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala @@ -139,7 +139,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest assert(listener.getExecutionMetrics(0).isEmpty) - listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq( + listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", null, Seq( // (task id, stage id, stage attempt, accum updates) (0L, 0, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)), (1L, 0, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)) @@ -152,7 +152,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest listener.onOtherEvent(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L)))) checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) - listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq( + listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", null, Seq( // (task id, stage id, stage attempt, accum updates) (0L, 0, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)), (1L, 0, 0, @@ -164,7 +164,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest // Retrying a stage should reset the metrics listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 1))) - listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq( + listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", null, Seq( // (task id, stage id, stage attempt, accum updates) (0L, 0, 1, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)), (1L, 0, 1, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)) @@ -204,7 +204,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest // Summit a new stage listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(1, 0))) - listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq( + listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", null, Seq( // (task id, stage id, stage attempt, accum updates) (0L, 1, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)), (1L, 1, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)) From 543cd88f18210311691c86db30eb6ad8618386b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Hiram=20Soltren?= Date: Tue, 4 Apr 2017 12:53:34 -0500 Subject: [PATCH 2/8] more fixups to get things compiling --- .../apache/spark/network/client/TransportClientFactory.java | 2 ++ .../org/apache/spark/network/server/TransportServer.java | 5 ++++- .../apache/spark/scheduler/EventLoggingListenerSuite.scala | 2 +- .../apache/spark/scheduler/ExternalClusterManagerSuite.scala | 2 ++ .../scala/org/apache/spark/storage/BlockManagerSuite.scala | 4 +++- .../apache/spark/network/client/TransportClientFactory.java | 4 +--- 6 files changed, 13 insertions(+), 6 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java index b50e043d5c9ce..290fcecf8a733 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java +++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java @@ -88,6 +88,8 @@ private static class ClientPool { private EventLoopGroup workerGroup; private PooledByteBufAllocator pooledAllocator; + public PooledByteBufAllocator getPooledAllocator() { return pooledAllocator; } + public TransportClientFactory( TransportContext context, List clientBootstraps) { diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java b/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java index 047c5f3f1f094..e1d44f3a8caf1 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java @@ -53,6 +53,7 @@ public class TransportServer implements Closeable { private ServerBootstrap bootstrap; private ChannelFuture channelFuture; + private PooledByteBufAllocator allocator; private int port = -1; /** @@ -78,6 +79,8 @@ public TransportServer( } } + public PooledByteBufAllocator getAllocator() { return allocator; } + public int getPort() { if (port == -1) { throw new IllegalStateException("Server not initialized"); @@ -92,7 +95,7 @@ private void init(String hostToBind, int portToBind) { NettyUtils.createEventLoop(ioMode, conf.serverThreads(), conf.getModuleName() + "-server"); EventLoopGroup workerGroup = bossGroup; - PooledByteBufAllocator allocator = NettyUtils.createPooledByteBufAllocator( + allocator = NettyUtils.createPooledByteBufAllocator( conf.preferDirectBufs(), true /* allowCache */, conf.serverThreads()); bootstrap = new ServerBootstrap() diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index a3af5e10d2d5f..84e9e37fde2b6 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -194,7 +194,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit // totally there are 16 lines, including SparkListenerLogStart event and 15 other events assert(lines.size === 16) - val listenerBus = new LiveListenerBus + val listenerBus = new LiveListenerBus(sc) val memoryListener = new MemoryListener listenerBus.addListener(memoryListener) diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala index ba56af8215cd7..201111274d204 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.scheduler import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite} +import org.apache.spark.executor.ExecutorMetrics import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.storage.BlockManagerId import org.apache.spark.util.AccumulatorV2 @@ -87,6 +88,7 @@ private class DummyTaskScheduler extends TaskScheduler { override def applicationAttemptId(): Option[String] = None def executorHeartbeatReceived( execId: String, + executorMetrics: ExecutorMetrics, accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], blockManagerId: BlockManagerId): Boolean = true } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 64a67b4c4cbab..e84c30f2e56b3 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -34,7 +34,7 @@ import org.scalatest.concurrent.Timeouts._ import org.apache.spark._ import org.apache.spark.broadcast.BroadcastManager -import org.apache.spark.executor.DataReadMethod +import org.apache.spark.executor.{DataReadMethod, ExecutorMetrics} import org.apache.spark.memory.UnifiedMemoryManager import org.apache.spark.network.{BlockDataManager, BlockTransferService} import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer} @@ -1257,6 +1257,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE listener.onBlockFetchSuccess("mockBlockId", new NioManagedBuffer(ByteBuffer.allocate(1))) } + override def getMemMetrics(executorMetrics: ExecutorMetrics): Unit = {} + override def close(): Unit = {} override def hostName: String = { "MockBlockTransferServiceHost" } diff --git a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java index b29ba6e199d50..d5c17eb2d3724 100644 --- a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java +++ b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java @@ -88,9 +88,7 @@ public ClientPool(int size) { private EventLoopGroup workerGroup; private PooledByteBufAllocator pooledAllocator; - public PooledByteBufAllocator getPooledAllocator() { - return pooledAllocator; - } + public PooledByteBufAllocator getPooledAllocator() { return pooledAllocator; } public TransportClientFactory( TransportContext context, From b794b86a37e816938a3172bb0a7aeb0e95ca0ca7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Hiram=20Soltren?= Date: Tue, 4 Apr 2017 13:13:46 -0500 Subject: [PATCH 3/8] fix JsonProtocolSuite --- .../test/scala/org/apache/spark/util/JsonProtocolSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index bbee4cb9b2acc..44c872af2cc85 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -101,7 +101,7 @@ class JsonProtocolSuite extends SparkFunSuite { makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHadoopInput = true, hasOutput = true) .accumulators().map(AccumulatorSuite.makeInfo) .zipWithIndex.map { case (a, i) => a.copy(id = i) } - SparkListenerExecutorMetricsUpdate("exec3", null, Seq((1L, 2, 3, accumUpdates))) + SparkListenerExecutorMetricsUpdate("exec3", executorMetrics, Seq((1L, 2, 3, accumUpdates))) } testEvent(stageSubmitted, stageSubmittedJsonString) From 941eef06587d2b20a63f46c86c3534b88a6d3bf7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Hiram=20Soltren?= Date: Mon, 10 Apr 2017 20:56:05 -0500 Subject: [PATCH 4/8] Some fixups related to internal code review. More are coming. --- .../scala/org/apache/spark/SparkEnv.scala | 3 +- .../org/apache/spark/executor/Executor.scala | 9 +- .../spark/executor/ExecutorMetrics.scala | 31 +- .../spark/network/BlockTransferService.scala | 2 +- .../netty/NettyBlockTransferService.scala | 17 +- .../scheduler/EventLoggingListener.scala | 23 +- .../client/TransportClientFactory.java | 266 ------------------ .../spark/network/server/TransportServer.java | 156 ---------- 8 files changed, 34 insertions(+), 473 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 018a181e0edbd..d589a855aae46 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -20,10 +20,11 @@ package org.apache.spark import java.io.File import java.net.Socket -import com.google.common.collect.MapMaker import scala.collection.mutable import scala.util.Properties +import com.google.common.collect.MapMaker + import org.apache.spark.annotation.DeveloperApi import org.apache.spark.api.python.PythonWorkerFactory import org.apache.spark.broadcast.BroadcastManager diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 7a2c2fa90ceae..bb5b9d69c58cf 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -710,11 +710,12 @@ private[spark] class Executor( } } - env.blockTransferService.getMemMetrics(this.executorMetrics) + env.blockTransferService.updateMemMetrics(this.executorMetrics) val executorMetrics = if (isLocal) { - // JobProgressListener might hold a reference of it during onExecutorMetricsUpdate() - // in future, if then JobProgressListener cannot see the changes of metrics any - // more, so make a deep copy of it here for future change. + // When running locally, there is a chance that the executorMetrics could change + // out from under us. So, copy them here, using serialization and deserialization + // to create a new object. + // TODO: Find a better way of doing this. Utils.deserialize[ExecutorMetrics](Utils.serialize(this.executorMetrics)) } else { this.executorMetrics diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala index d63209f52b877..eed8f3ab230d6 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala @@ -22,36 +22,27 @@ import org.apache.spark.annotation.DeveloperApi /** * :: DeveloperApi :: * Metrics tracked during the execution of an executor. - * - * So, when adding new fields, take into consideration that the whole object can be serialized for - * shipping off at any time to consumers of the SparkListener interface. */ @DeveloperApi class ExecutorMetrics extends Serializable { - /** - * Host's name the executor runs on - */ private var _hostname: String = "" def hostname: String = _hostname private[spark] def setHostname(value: String) = _hostname = value - /** - * Host's port the executor runs on - */ private var _port: Option[Int] = None def port: Option[Int] = _port private[spark] def setPort(value: Option[Int]) = _port = value private[spark] def hostPort: String = { - val hp = port match { + port match { case None => hostname case value => hostname + ":" + value.get } - hp } - private var _transportMetrics: TransportMetrics = new TransportMetrics + private var _transportMetrics: TransportMetrics = + new TransportMetrics(System.currentTimeMillis(), 0L, 0L) def transportMetrics: TransportMetrics = _transportMetrics private[spark] def setTransportMetrics(value: TransportMetrics) = { _transportMetrics = value @@ -76,16 +67,8 @@ object ExecutorMetrics extends Serializable { * Metrics for network layer */ @DeveloperApi -class TransportMetrics ( - val timeStamp: Long = System.currentTimeMillis, - val onHeapSize: Long = 0L, - val offHeapSize: Long = 0L) extends Serializable +case class TransportMetrics ( + val timeStamp: Long, + val onHeapSize: Long, + val offHeapSize: Long) extends Serializable -object TransportMetrics extends Serializable { - def apply( - timeStamp: Long, - onHeapSize: Long, - offHeapSize: Long): TransportMetrics = { - new TransportMetrics(timeStamp, onHeapSize, offHeapSize) - } -} diff --git a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala index ebe4dbc95413a..b0e341d340584 100644 --- a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala @@ -43,7 +43,7 @@ abstract class BlockTransferService extends ShuffleClient with Closeable with Lo /** * Collect current executor memory metrics of transferService. */ - private[spark] def getMemMetrics(executorMetrics: ExecutorMetrics): Unit + private[spark] def updateMemMetrics(executorMetrics: ExecutorMetrics): Unit /** * Tear down the transfer service. diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala index 547e863b69ffd..8201fe323b3e0 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala @@ -22,6 +22,7 @@ import java.nio.ByteBuffer import scala.collection.JavaConverters._ import scala.concurrent.{Future, Promise} import scala.reflect.ClassTag +import scala.tools.nsc.interpreter.JList import io.netty.buffer._ @@ -69,14 +70,14 @@ private[spark] class NettyBlockTransferService( clock = newClock } - private[spark] override def getMemMetrics(executorMetrics: ExecutorMetrics): Unit = { + private[spark] override def updateMemMetrics(executorMetrics: ExecutorMetrics): Unit = { val currentTime = clock.getTimeMillis() val clientPooledAllocator = clientFactory.getPooledAllocator() val serverAllocator = server.getAllocator() - val clientOffHeapSize: Long = sumOfMetrics(convToScala(clientPooledAllocator.directArenas())) - val clientOnHeapSize: Long = sumOfMetrics(convToScala(clientPooledAllocator.heapArenas())) - val serverOffHeapSize: Long = sumOfMetrics(convToScala(serverAllocator.directArenas())) - val serverOnHeapSize: Long = sumOfMetrics(convToScala(serverAllocator.heapArenas())) + val clientOffHeapSize = sumOfMetrics(clientPooledAllocator.directArenas()) + val clientOnHeapSize = sumOfMetrics(clientPooledAllocator.heapArenas()) + val serverOffHeapSize = sumOfMetrics(serverAllocator.directArenas()) + val serverOnHeapSize = sumOfMetrics(serverAllocator.heapArenas()) logDebug(s"Current Netty Client offheap size is $clientOffHeapSize, " + s"Client heap size is $clientOnHeapSize, Server offheap size is $serverOffHeapSize, " + s"server heap size is $serverOnHeapSize, executor id is " + @@ -85,10 +86,8 @@ private[spark] class NettyBlockTransferService( clientOnHeapSize + serverOnHeapSize, clientOffHeapSize + serverOffHeapSize)) } - private def convToScala = (x: java.util.List[PoolArenaMetric]) => x.asScala - - private def sumOfMetrics(arenaMetricList: Seq[PoolArenaMetric]): Long = { - arenaMetricList.map { Arena => + private def sumOfMetrics(arenaMetricList: JList[PoolArenaMetric]): Long = { + arenaMetricList.asScala.map { Arena => Arena.chunkLists().asScala.map { chunk => chunk.iterator().asScala.map(_.chunkSize()).sum }.sum diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index 2fe76361bed7b..217c316358d50 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -149,18 +149,16 @@ private[spark] class EventLoggingListener( } } - // When a stage is submitted and completed, we updated our executor memory metrics for that - // stage, and then log the metrics. Anytime we receive more executor metrics, we update our + // When a stage is submitted and completed, update the executor memory metrics for that + // stage, and then log the metrics. Anytime we receive more executor metrics, update the // running set of {{executorIdToLatestMetrics}} and {{executorIdToModifiedMaxMetrics}}. - // Since stages submit and complete time might be interleaved, we maintain the latest and - // max metrics for each time segment. So, for each stage start and stage complete, we - // replace each item in {{executorIdToModifiedMaxMetrics}} with that - // in {{executorIdToLatestMetrics}}. + // Since stage submit and complete times might be interleaved, maintain the latest and + // max metrics for each time segment. For each stage start and stage completion, replace + // each item in {{executorIdToModifiedMaxMetrics}} with that in {{executorIdToLatestMetrics}}. private def updateAndLogExecutorMemoryMetrics() : Unit = { executorIdToModifiedMaxMetrics.foreach { case(_, metrics) => logEvent(metrics) } - // Clear the modified metrics map after each log action executorIdToModifiedMaxMetrics.clear() - executorIdToLatestMetrics.foreach {case(_, metrics) => logEvent(metrics) } + executorIdToLatestMetrics.foreach { case(_, metrics) => logEvent(metrics) } } // Events that do not trigger a flush @@ -292,8 +290,11 @@ private[spark] class EventLoggingListener( } /** - * According to the updated event to modify the maintained event's metrics - * @param latestEvent the latest event received that used to update the maintained metric + * Process an executor metrics update and update our stored cache of events. + * Does this event match the ID of an executor we are already tracking? + * If no, start tracking metrics for this executor, starting at this event. + * If yes, compare time stamps, and perhaps update using this event. + * @param latestEvent the latest event received, used to update our map of stored metrics. */ private def updateModifiedMetrics(latestEvent: SparkListenerExecutorMetricsUpdate): Unit = { val executorId = latestEvent.execId @@ -319,8 +320,6 @@ private[spark] class EventLoggingListener( } else { toBeModTransMetrics.offHeapSize } - - // We should maintain a new instance for each update to avoid side-effect val modifiedExecMetrics = ExecutorMetrics(toBeModifiedEvent.executorMetrics.hostname, toBeModifiedEvent.executorMetrics.port, TransportMetrics(timeStamp, onHeapSize, offHeapSize)) diff --git a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java index d5c17eb2d3724..e69de29bb2d1d 100644 --- a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java +++ b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java @@ -1,266 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.network.client; - -import java.io.Closeable; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.util.List; -import java.util.Random; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicReference; - -import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; -import com.google.common.collect.Lists; -import io.netty.bootstrap.Bootstrap; -import io.netty.buffer.PooledByteBufAllocator; -import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelOption; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.socket.SocketChannel; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.spark.network.TransportContext; -import org.apache.spark.network.server.TransportChannelHandler; -import org.apache.spark.network.util.IOMode; -import org.apache.spark.network.util.JavaUtils; -import org.apache.spark.network.util.NettyUtils; -import org.apache.spark.network.util.TransportConf; - -/** - * Factory for creating {@link TransportClient}s by using createClient. - * - * The factory maintains a connection pool to other hosts and should return the same - * TransportClient for the same remote host. It also shares a single worker thread pool for - * all TransportClients. - * - * TransportClients will be reused whenever possible. Prior to completing the creation of a new - * TransportClient, all given {@link TransportClientBootstrap}s will be run. - */ -public class TransportClientFactory implements Closeable { - - /** A simple data structure to track the pool of clients between two peer nodes. */ - private static class ClientPool { - TransportClient[] clients; - Object[] locks; - - public ClientPool(int size) { - clients = new TransportClient[size]; - locks = new Object[size]; - for (int i = 0; i < size; i++) { - locks[i] = new Object(); - } - } - } - - private final Logger logger = LoggerFactory.getLogger(TransportClientFactory.class); - - private final TransportContext context; - private final TransportConf conf; - private final List clientBootstraps; - private final ConcurrentHashMap connectionPool; - - /** Random number generator for picking connections between peers. */ - private final Random rand; - private final int numConnectionsPerPeer; - - private final Class socketChannelClass; - private EventLoopGroup workerGroup; - private PooledByteBufAllocator pooledAllocator; - - public PooledByteBufAllocator getPooledAllocator() { return pooledAllocator; } - - public TransportClientFactory( - TransportContext context, - List clientBootstraps) { - this.context = Preconditions.checkNotNull(context); - this.conf = context.getConf(); - this.clientBootstraps = Lists.newArrayList(Preconditions.checkNotNull(clientBootstraps)); - this.connectionPool = new ConcurrentHashMap(); - this.numConnectionsPerPeer = conf.numConnectionsPerPeer(); - this.rand = new Random(); - - IOMode ioMode = IOMode.valueOf(conf.ioMode()); - this.socketChannelClass = NettyUtils.getClientChannelClass(ioMode); - // TODO: Make thread pool name configurable. - this.workerGroup = NettyUtils.createEventLoop(ioMode, conf.clientThreads(), "shuffle-client"); - this.pooledAllocator = NettyUtils.createPooledByteBufAllocator( - conf.preferDirectBufs(), false /* allowCache */, conf.clientThreads()); - } - - /** - * Create a {@link TransportClient} connecting to the given remote host / port. - * - * We maintains an array of clients (size determined by spark.shuffle.io.numConnectionsPerPeer) - * and randomly picks one to use. If no client was previously created in the randomly selected - * spot, this function creates a new client and places it there. - * - * Prior to the creation of a new TransportClient, we will execute all - * {@link TransportClientBootstrap}s that are registered with this factory. - * - * This blocks until a connection is successfully established and fully bootstrapped. - * - * Concurrency: This method is safe to call from multiple threads. - */ - public TransportClient createClient(String remoteHost, int remotePort) throws IOException { - // Get connection from the connection pool first. - // If it is not found or not active, create a new one. - final InetSocketAddress address = new InetSocketAddress(remoteHost, remotePort); - - // Create the ClientPool if we don't have it yet. - ClientPool clientPool = connectionPool.get(address); - if (clientPool == null) { - connectionPool.putIfAbsent(address, new ClientPool(numConnectionsPerPeer)); - clientPool = connectionPool.get(address); - } - - int clientIndex = rand.nextInt(numConnectionsPerPeer); - TransportClient cachedClient = clientPool.clients[clientIndex]; - - if (cachedClient != null && cachedClient.isActive()) { - // Make sure that the channel will not timeout by updating the last use time of the - // handler. Then check that the client is still alive, in case it timed out before - // this code was able to update things. - TransportChannelHandler handler = cachedClient.getChannel().pipeline() - .get(TransportChannelHandler.class); - synchronized (handler) { - handler.getResponseHandler().updateTimeOfLastRequest(); - } - - if (cachedClient.isActive()) { - logger.trace("Returning cached connection to {}: {}", address, cachedClient); - return cachedClient; - } - } - - // If we reach here, we don't have an existing connection open. Let's create a new one. - // Multiple threads might race here to create new connections. Keep only one of them active. - synchronized (clientPool.locks[clientIndex]) { - cachedClient = clientPool.clients[clientIndex]; - - if (cachedClient != null) { - if (cachedClient.isActive()) { - logger.trace("Returning cached connection to {}: {}", address, cachedClient); - return cachedClient; - } else { - logger.info("Found inactive connection to {}, creating a new one.", address); - } - } - clientPool.clients[clientIndex] = createClient(address); - return clientPool.clients[clientIndex]; - } - } - - /** - * Create a completely new {@link TransportClient} to the given remote host / port. - * This connection is not pooled. - * - * As with {@link #createClient(String, int)}, this method is blocking. - */ - public TransportClient createUnmanagedClient(String remoteHost, int remotePort) - throws IOException { - final InetSocketAddress address = new InetSocketAddress(remoteHost, remotePort); - return createClient(address); - } - - /** Create a completely new {@link TransportClient} to the remote address. */ - private TransportClient createClient(InetSocketAddress address) throws IOException { - logger.debug("Creating new connection to " + address); - - Bootstrap bootstrap = new Bootstrap(); - bootstrap.group(workerGroup) - .channel(socketChannelClass) - // Disable Nagle's Algorithm since we don't want packets to wait - .option(ChannelOption.TCP_NODELAY, true) - .option(ChannelOption.SO_KEEPALIVE, true) - .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectionTimeoutMs()) - .option(ChannelOption.ALLOCATOR, pooledAllocator); - - final AtomicReference clientRef = new AtomicReference(); - final AtomicReference channelRef = new AtomicReference(); - - bootstrap.handler(new ChannelInitializer() { - @Override - public void initChannel(SocketChannel ch) { - TransportChannelHandler clientHandler = context.initializePipeline(ch); - clientRef.set(clientHandler.getClient()); - channelRef.set(ch); - } - }); - - // Connect to the remote server - long preConnect = System.nanoTime(); - ChannelFuture cf = bootstrap.connect(address); - if (!cf.awaitUninterruptibly(conf.connectionTimeoutMs())) { - throw new IOException( - String.format("Connecting to %s timed out (%s ms)", address, conf.connectionTimeoutMs())); - } else if (cf.cause() != null) { - throw new IOException(String.format("Failed to connect to %s", address), cf.cause()); - } - - TransportClient client = clientRef.get(); - Channel channel = channelRef.get(); - assert client != null : "Channel future completed successfully with null client"; - - // Execute any client bootstraps synchronously before marking the Client as successful. - long preBootstrap = System.nanoTime(); - logger.debug("Connection to {} successful, running bootstraps...", address); - try { - for (TransportClientBootstrap clientBootstrap : clientBootstraps) { - clientBootstrap.doBootstrap(client, channel); - } - } catch (Exception e) { // catch non-RuntimeExceptions too as bootstrap may be written in Scala - long bootstrapTimeMs = (System.nanoTime() - preBootstrap) / 1000000; - logger.error("Exception while bootstrapping client after " + bootstrapTimeMs + " ms", e); - client.close(); - throw Throwables.propagate(e); - } - long postBootstrap = System.nanoTime(); - - logger.debug("Successfully created connection to {} after {} ms ({} ms spent in bootstraps)", - address, (postBootstrap - preConnect) / 1000000, (postBootstrap - preBootstrap) / 1000000); - - return client; - } - - /** Close all connections in the connection pool, and shutdown the worker thread pool. */ - @Override - public void close() { - // Go through all clients and close them if they are active. - for (ClientPool clientPool : connectionPool.values()) { - for (int i = 0; i < clientPool.clients.length; i++) { - TransportClient client = clientPool.clients[i]; - if (client != null) { - clientPool.clients[i] = null; - JavaUtils.closeQuietly(client); - } - } - } - connectionPool.clear(); - - if (workerGroup != null) { - workerGroup.shutdownGracefully(); - workerGroup = null; - } - } -} diff --git a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java index 165d2d8a88ecb..e69de29bb2d1d 100644 --- a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java +++ b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java @@ -1,156 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.network.server; - -import java.io.Closeable; -import java.net.InetSocketAddress; -import java.util.List; -import java.util.concurrent.TimeUnit; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import io.netty.bootstrap.ServerBootstrap; -import io.netty.buffer.PooledByteBufAllocator; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelOption; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.socket.SocketChannel; -import org.apache.spark.network.util.JavaUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.spark.network.TransportContext; -import org.apache.spark.network.util.IOMode; -import org.apache.spark.network.util.NettyUtils; -import org.apache.spark.network.util.TransportConf; - -/** - * Server for the efficient, low-level streaming service. - */ -public class TransportServer implements Closeable { - private final Logger logger = LoggerFactory.getLogger(TransportServer.class); - - private final TransportContext context; - private final TransportConf conf; - private final RpcHandler appRpcHandler; - private final List bootstraps; - - private ServerBootstrap bootstrap; - private ChannelFuture channelFuture; - private PooledByteBufAllocator allocator; - private int port = -1; - - public PooledByteBufAllocator getAllocator() { - return allocator; - } - - /** - * Creates a TransportServer that binds to the given host and the given port, or to any available - * if 0. If you don't want to bind to any special host, set "hostToBind" to null. - **/ - public TransportServer( - TransportContext context, - String hostToBind, - int portToBind, - RpcHandler appRpcHandler, - List bootstraps) { - this.context = context; - this.conf = context.getConf(); - this.appRpcHandler = appRpcHandler; - this.bootstraps = Lists.newArrayList(Preconditions.checkNotNull(bootstraps)); - - try { - init(hostToBind, portToBind); - } catch (RuntimeException e) { - JavaUtils.closeQuietly(this); - throw e; - } - } - - public int getPort() { - if (port == -1) { - throw new IllegalStateException("Server not initialized"); - } - return port; - } - - private void init(String hostToBind, int portToBind) { - - IOMode ioMode = IOMode.valueOf(conf.ioMode()); - EventLoopGroup bossGroup = - NettyUtils.createEventLoop(ioMode, conf.serverThreads(), "shuffle-server"); - EventLoopGroup workerGroup = bossGroup; - - allocator = NettyUtils.createPooledByteBufAllocator( - conf.preferDirectBufs(), true /* allowCache */, conf.serverThreads()); - - bootstrap = new ServerBootstrap() - .group(bossGroup, workerGroup) - .channel(NettyUtils.getServerChannelClass(ioMode)) - .option(ChannelOption.ALLOCATOR, allocator) - .childOption(ChannelOption.ALLOCATOR, allocator); - - if (conf.backLog() > 0) { - bootstrap.option(ChannelOption.SO_BACKLOG, conf.backLog()); - } - - if (conf.receiveBuf() > 0) { - bootstrap.childOption(ChannelOption.SO_RCVBUF, conf.receiveBuf()); - } - - if (conf.sendBuf() > 0) { - bootstrap.childOption(ChannelOption.SO_SNDBUF, conf.sendBuf()); - } - - bootstrap.childHandler(new ChannelInitializer() { - @Override - protected void initChannel(SocketChannel ch) throws Exception { - RpcHandler rpcHandler = appRpcHandler; - for (TransportServerBootstrap bootstrap : bootstraps) { - rpcHandler = bootstrap.doBootstrap(ch, rpcHandler); - } - context.initializePipeline(ch, rpcHandler); - } - }); - - InetSocketAddress address = hostToBind == null ? - new InetSocketAddress(portToBind): new InetSocketAddress(hostToBind, portToBind); - channelFuture = bootstrap.bind(address); - channelFuture.syncUninterruptibly(); - - port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort(); - logger.debug("Shuffle server started on port :" + port); - } - - @Override - public void close() { - if (channelFuture != null) { - // close is a local operation and should finish within milliseconds; timeout just to be safe - channelFuture.channel().close().awaitUninterruptibly(10, TimeUnit.SECONDS); - channelFuture = null; - } - if (bootstrap != null && bootstrap.group() != null) { - bootstrap.group().shutdownGracefully(); - } - if (bootstrap != null && bootstrap.childGroup() != null) { - bootstrap.childGroup().shutdownGracefully(); - } - bootstrap = null; - } -} From 72ed41eb449626e06fbed22b3a96dfd3f299e58d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Hiram=20Soltren?= Date: Mon, 10 Apr 2017 20:58:54 -0500 Subject: [PATCH 5/8] delete stragglers --- .../org/apache/spark/network/client/TransportClientFactory.java | 0 .../java/org/apache/spark/network/server/TransportServer.java | 0 2 files changed, 0 insertions(+), 0 deletions(-) delete mode 100644 network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java delete mode 100644 network/common/src/main/java/org/apache/spark/network/server/TransportServer.java diff --git a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java deleted file mode 100644 index e69de29bb2d1d..0000000000000 diff --git a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java deleted file mode 100644 index e69de29bb2d1d..0000000000000 From 7cddd6c335a2a961a3b9e8061ad32f57a6a5063a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Hiram=20Soltren?= Date: Wed, 12 Apr 2017 14:02:58 -0500 Subject: [PATCH 6/8] Doc changes and minor tweaks in preparation for PR --- .../org/apache/spark/executor/Executor.scala | 7 ++--- .../spark/executor/ExecutorMetrics.scala | 2 -- .../scheduler/EventLoggingListener.scala | 27 ++++++++++--------- .../apache/spark/ui/memory/MemoryTab.scala | 21 ++++++++------- .../spark/storage/BlockManagerSuite.scala | 2 +- 5 files changed, 30 insertions(+), 29 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index bb5b9d69c58cf..6342a098c919e 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -713,9 +713,10 @@ private[spark] class Executor( env.blockTransferService.updateMemMetrics(this.executorMetrics) val executorMetrics = if (isLocal) { // When running locally, there is a chance that the executorMetrics could change - // out from under us. So, copy them here, using serialization and deserialization - // to create a new object. - // TODO: Find a better way of doing this. + // out from under us. So, copy them here. In non-local mode this object would be + // serialized and de-serialized on its way to the driver. Perform that operation here + // to obtain the same result as non-local mode. + // TODO - Add a test that fails in local mode if we don't copy executorMetrics here. Utils.deserialize[ExecutorMetrics](Utils.serialize(this.executorMetrics)) } else { this.executorMetrics diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala index eed8f3ab230d6..5d03ba6df96a6 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala @@ -21,7 +21,6 @@ import org.apache.spark.annotation.DeveloperApi /** * :: DeveloperApi :: - * Metrics tracked during the execution of an executor. */ @DeveloperApi class ExecutorMetrics extends Serializable { @@ -64,7 +63,6 @@ object ExecutorMetrics extends Serializable { /** * :: DeveloperApi :: - * Metrics for network layer */ @DeveloperApi case class TransportMetrics ( diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index 217c316358d50..4680e1bfcfcca 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -303,22 +303,23 @@ private[spark] class EventLoggingListener( case None => executorIdToModifiedMaxMetrics(executorId) = latestEvent case Some(toBeModifiedEvent) => - val toBeModifiedMetrics = toBeModifiedEvent.executorMetrics.transportMetrics - val latestTransMetrics = latestEvent.executorMetrics.transportMetrics - val toBeModTransMetrics = toBeModifiedMetrics - var timeStamp: Long = toBeModTransMetrics.timeStamp - // the logic here should be the same with that for memoryListener - val onHeapSize = if (latestTransMetrics.onHeapSize > toBeModTransMetrics.onHeapSize) { - timeStamp = latestTransMetrics.timeStamp - latestTransMetrics.onHeapSize + val toBeModifiedTransportMetrics = toBeModifiedEvent.executorMetrics.transportMetrics + val latestTransportMetrics = latestEvent.executorMetrics.transportMetrics + var timeStamp: Long = toBeModifiedTransportMetrics.timeStamp + + val onHeapSize = if + (latestTransportMetrics.onHeapSize > toBeModifiedTransportMetrics.onHeapSize) { + timeStamp = latestTransportMetrics.timeStamp + latestTransportMetrics.onHeapSize } else { - toBeModTransMetrics.onHeapSize + toBeModifiedTransportMetrics.onHeapSize } - val offHeapSize = if (latestTransMetrics.offHeapSize > toBeModTransMetrics.offHeapSize) { - timeStamp = latestTransMetrics.timeStamp - latestTransMetrics.offHeapSize + val offHeapSize = + if (latestTransportMetrics.offHeapSize > toBeModifiedTransportMetrics.offHeapSize) { + timeStamp = latestTransportMetrics.timeStamp + latestTransportMetrics.offHeapSize } else { - toBeModTransMetrics.offHeapSize + toBeModifiedTransportMetrics.offHeapSize } val modifiedExecMetrics = ExecutorMetrics(toBeModifiedEvent.executorMetrics.hostname, toBeModifiedEvent.executorMetrics.port, diff --git a/core/src/main/scala/org/apache/spark/ui/memory/MemoryTab.scala b/core/src/main/scala/org/apache/spark/ui/memory/MemoryTab.scala index cc23334860a5a..8bdd25c188afb 100644 --- a/core/src/main/scala/org/apache/spark/ui/memory/MemoryTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/memory/MemoryTab.scala @@ -34,29 +34,30 @@ private[ui] class MemoryTab(parent: SparkUI) extends SparkUITab(parent, "memory" /** * :: DeveloperApi :: - * A SparkListener that prepares information to be displayed on the MemoryTab + * A SparkListener that prepares information to be displayed on the MemoryTab. */ @DeveloperApi class MemoryListener extends SparkListener { type ExecutorId = String val activeExecutorIdToMem = new HashMap[ExecutorId, MemoryUIInfo] - // TODO There might be plenty of removed executors (e.g. Dynamic Allocation Mode), This may use - // too much memory. + // TODO This may use too much memory. + // There may be many removed executors (e.g. in Dynamic Allocation Mode). val removedExecutorIdToMem = new HashMap[ExecutorId, MemoryUIInfo] - // A map that maintains the latest metrics of each active executor + // A map that maintains the latest metrics of each active executor. val latestExecIdToExecMetrics = new HashMap[ExecutorId, ExecutorMetrics] - // activeStagesToMem a map maintains all executors memory information of each stage, - // the Map type is [(stageId, attemptId), Seq[(executorId, MemoryUIInfo)] + // A map that maintains all executors memory information of each stage. + // [(stageId, attemptId), Seq[(executorId, MemoryUIInfo)] val activeStagesToMem = new HashMap[(Int, Int), HashMap[ExecutorId, MemoryUIInfo]] - // TODO We need to get conf of the retained stages so that we don't need to handle all the - // stages since there might be too many completed stages. + // TODO Get conf of the retained stages so that we don't need to handle them all. + // There may be many completed stages. val completedStagesToMem = new HashMap[(Int, Int), HashMap[ExecutorId, MemoryUIInfo]] override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { val executorId = event.execId val executorMetrics = event.executorMetrics - val memoryInfo = activeExecutorIdToMem.getOrElseUpdate(executorId, new MemoryUIInfo) - memoryInfo.updateMemUiInfo(executorMetrics) + activeExecutorIdToMem + .getOrElseUpdate(executorId, new MemoryUIInfo) + .updateMemUiInfo(executorMetrics) activeStagesToMem.foreach { case (_, stageMemMetrics) => // If executor is added in the stage running time, we also update the metrics for the // executor in {{activeStagesToMem}} diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index e84c30f2e56b3..e1b8374dd3034 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -1257,7 +1257,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE listener.onBlockFetchSuccess("mockBlockId", new NioManagedBuffer(ByteBuffer.allocate(1))) } - override def getMemMetrics(executorMetrics: ExecutorMetrics): Unit = {} + override def updateMemMetrics(executorMetrics: ExecutorMetrics): Unit = {} override def close(): Unit = {} From 1bb14b476af6d55ce0c53b9b9b40946f92cac1c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Hiram=20Soltren?= Date: Wed, 12 Apr 2017 16:53:43 -0500 Subject: [PATCH 7/8] s/Happen Time/Peak Time --- .../main/scala/org/apache/spark/ui/memory/MemoryTable.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/memory/MemoryTable.scala b/core/src/main/scala/org/apache/spark/ui/memory/MemoryTable.scala index 11a1f0dc321ef..74036ba44b0f8 100644 --- a/core/src/main/scala/org/apache/spark/ui/memory/MemoryTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/memory/MemoryTable.scala @@ -36,8 +36,8 @@ private[ui] class MemTableBase( Address Network Memory (on-heap) Network Memory (off-heap) - Peak Network Memory (on-heap) / Happen Time - Peak Network Read (off-heap) / Happen Time + Peak Network Memory (on-heap) / Peak Time + Peak Network Read (off-heap) / Peak Time } def toNodeSeq: Seq[Node] = { From 577d44211888232541a39026850ab7aa7b125e70 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Hiram=20Soltren?= Date: Mon, 17 Apr 2017 11:02:39 -0500 Subject: [PATCH 8/8] Respond to some code review feedback. Make executorMetrics an Option. --- .../netty/NettyBlockTransferService.scala | 2 +- .../apache/spark/scheduler/DAGScheduler.scala | 3 +- .../scheduler/EventLoggingListener.scala | 59 +++++++++++-------- .../spark/scheduler/SparkListener.scala | 6 +- .../apache/spark/ui/memory/MemoryTab.scala | 6 +- .../org/apache/spark/util/JsonProtocol.scala | 8 ++- project/MimaExcludes.scala | 9 ++- 7 files changed, 57 insertions(+), 36 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala index 8201fe323b3e0..1a8911a0f9a59 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala @@ -18,11 +18,11 @@ package org.apache.spark.network.netty import java.nio.ByteBuffer +import java.util.{List => JList} import scala.collection.JavaConverters._ import scala.concurrent.{Future, Promise} import scala.reflect.ClassTag -import scala.tools.nsc.interpreter.JList import io.netty.buffer._ diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index afd13bc09b939..829ae7be2b7e2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -239,7 +239,8 @@ class DAGScheduler( // (taskId, stageId, stageAttemptId, accumUpdates) accumUpdates: Array[(Long, Int, Int, Seq[AccumulableInfo])], blockManagerId: BlockManagerId): Boolean = { - listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, executorMetrics, accumUpdates)) + listenerBus.post( + SparkListenerExecutorMetricsUpdate(execId, accumUpdates, Some(executorMetrics))) blockManagerMaster.driverEndpoint.askSync[Boolean]( BlockManagerHeartbeat(blockManagerId), new RpcTimeout(600 seconds, "BlockManagerHeartbeat")) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index 7d03fe64925ac..c8e5ef1872456 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -89,8 +89,8 @@ private[spark] class EventLoggingListener( private[scheduler] val logPath = getLogPath(logBaseDir, appId, appAttemptId, compressionCodecName) private val executorIdToLatestMetrics = new HashMap[String, SparkListenerExecutorMetricsUpdate] - private val executorIdToModifiedMaxMetrics = new - HashMap[String, SparkListenerExecutorMetricsUpdate] + private val executorIdToModifiedMaxMetrics = + new HashMap[String, SparkListenerExecutorMetricsUpdate] /** * Creates the log file in the configured log directory. @@ -242,7 +242,7 @@ private[spark] class EventLoggingListener( // We only track the executor metrics in each stage, so we drop the task metrics as they are // quite verbose val eventWithoutTaskMetrics = SparkListenerExecutorMetricsUpdate( - event.execId, event.executorMetrics, Seq.empty) + event.execId, Seq.empty, event.executorMetrics) executorIdToLatestMetrics(eventWithoutTaskMetrics.execId) = eventWithoutTaskMetrics updateModifiedMetrics(eventWithoutTaskMetrics) } @@ -295,6 +295,9 @@ private[spark] class EventLoggingListener( * Does this event match the ID of an executor we are already tracking? * If no, start tracking metrics for this executor, starting at this event. * If yes, compare time stamps, and perhaps update using this event. + * Only do this if executorMetrics is present in the toBeModifiedEvent. + * If it is not - meaning we are processing historical data created + * without executorMetrics - simply cache the latestEvent * @param latestEvent the latest event received, used to update our map of stored metrics. */ private def updateModifiedMetrics(latestEvent: SparkListenerExecutorMetricsUpdate): Unit = { @@ -304,30 +307,36 @@ private[spark] class EventLoggingListener( case None => executorIdToModifiedMaxMetrics(executorId) = latestEvent case Some(toBeModifiedEvent) => - val toBeModifiedTransportMetrics = toBeModifiedEvent.executorMetrics.transportMetrics - val latestTransportMetrics = latestEvent.executorMetrics.transportMetrics - var timeStamp: Long = toBeModifiedTransportMetrics.timeStamp - - val onHeapSize = if - (latestTransportMetrics.onHeapSize > toBeModifiedTransportMetrics.onHeapSize) { - timeStamp = latestTransportMetrics.timeStamp - latestTransportMetrics.onHeapSize - } else { - toBeModifiedTransportMetrics.onHeapSize + if (toBeModifiedEvent.executorMetrics.isEmpty || + latestEvent.executorMetrics.isEmpty) { + executorIdToModifiedMaxMetrics(executorId) == latestEvent } - val offHeapSize = - if (latestTransportMetrics.offHeapSize > toBeModifiedTransportMetrics.offHeapSize) { - timeStamp = latestTransportMetrics.timeStamp - latestTransportMetrics.offHeapSize - } else { - toBeModifiedTransportMetrics.offHeapSize + else { + val prevTransportMetrics = toBeModifiedEvent.executorMetrics.get.transportMetrics + val latestTransportMetrics = latestEvent.executorMetrics.get.transportMetrics + var timeStamp: Long = prevTransportMetrics.timeStamp + + val onHeapSize = if + (latestTransportMetrics.onHeapSize > prevTransportMetrics.onHeapSize) { + timeStamp = latestTransportMetrics.timeStamp + latestTransportMetrics.onHeapSize + } else { + prevTransportMetrics.onHeapSize + } + val offHeapSize = + if (latestTransportMetrics.offHeapSize > prevTransportMetrics.offHeapSize) { + timeStamp = latestTransportMetrics.timeStamp + latestTransportMetrics.offHeapSize + } else { + prevTransportMetrics.offHeapSize + } + val updatedExecMetrics = ExecutorMetrics(toBeModifiedEvent.executorMetrics.get.hostname, + toBeModifiedEvent.executorMetrics.get.port, + TransportMetrics(timeStamp, onHeapSize, offHeapSize)) + val modifiedEvent = SparkListenerExecutorMetricsUpdate( + toBeModifiedEvent.execId, toBeModifiedEvent.accumUpdates, Some(updatedExecMetrics)) + executorIdToModifiedMaxMetrics(executorId) = modifiedEvent } - val modifiedExecMetrics = ExecutorMetrics(toBeModifiedEvent.executorMetrics.hostname, - toBeModifiedEvent.executorMetrics.port, - TransportMetrics(timeStamp, onHeapSize, offHeapSize)) - val modifiedEvent = SparkListenerExecutorMetricsUpdate( - toBeModifiedEvent.execId, modifiedExecMetrics, toBeModifiedEvent.accumUpdates) - executorIdToModifiedMaxMetrics(executorId) = modifiedEvent } } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index 8f9bd40af6d47..e8859aaf3e64f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -138,14 +138,14 @@ case class SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo) extends /** * Periodic updates from executors. * @param execId executor id - * @param executorMetrics metrics in executor level * @param accumUpdates sequence of (taskId, stageId, stageAttemptId, accumUpdates) + * @param executorMetrics keeps track of TransportMetrics for an executor Added in Spark 2.3. */ @DeveloperApi case class SparkListenerExecutorMetricsUpdate( execId: String, - executorMetrics: ExecutorMetrics, - accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])]) + accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])], + executorMetrics: Option[ExecutorMetrics]) extends SparkListenerEvent @DeveloperApi diff --git a/core/src/main/scala/org/apache/spark/ui/memory/MemoryTab.scala b/core/src/main/scala/org/apache/spark/ui/memory/MemoryTab.scala index 8bdd25c188afb..ce008d4c26cfe 100644 --- a/core/src/main/scala/org/apache/spark/ui/memory/MemoryTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/memory/MemoryTab.scala @@ -57,16 +57,16 @@ class MemoryListener extends SparkListener { val executorMetrics = event.executorMetrics activeExecutorIdToMem .getOrElseUpdate(executorId, new MemoryUIInfo) - .updateMemUiInfo(executorMetrics) + .updateMemUiInfo(executorMetrics.get) activeStagesToMem.foreach { case (_, stageMemMetrics) => // If executor is added in the stage running time, we also update the metrics for the // executor in {{activeStagesToMem}} if (!stageMemMetrics.contains(executorId)) { stageMemMetrics(executorId) = new MemoryUIInfo } - stageMemMetrics(executorId).updateMemUiInfo(executorMetrics) + stageMemMetrics(executorId).updateMemUiInfo(executorMetrics.get) } - latestExecIdToExecMetrics(executorId) = executorMetrics + latestExecIdToExecMetrics(executorId) = executorMetrics.get } override def onExecutorAdded(event: SparkListenerExecutorAdded): Unit = { diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index c970a5eb86578..3073584f28b55 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -237,9 +237,13 @@ private[spark] object JsonProtocol { val execId = metricsUpdate.execId val executorMetrics = metricsUpdate.executorMetrics val accumUpdates = metricsUpdate.accumUpdates + val metricsJson: JValue = executorMetrics match { + case Some(metrics) => executorMetricsToJson(metrics) + case None => "none" + } ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.metricsUpdate) ~ ("Executor ID" -> execId) ~ - ("Executor Metrics Updated" -> executorMetricsToJson(executorMetrics)) ~ + ("Executor Metrics Updated" -> metricsJson) ~ ("Metrics Updated" -> accumUpdates.map { case (taskId, stageId, stageAttemptId, updates) => ("Task ID" -> taskId) ~ ("Stage ID" -> stageId) ~ @@ -688,7 +692,7 @@ private[spark] object JsonProtocol { (json \ "Accumulator Updates").extract[List[JValue]].map(accumulableInfoFromJson) (taskId, stageId, stageAttemptId, updates) } - SparkListenerExecutorMetricsUpdate(execInfo, executorMetrics, accumUpdates) + SparkListenerExecutorMetricsUpdate(execInfo, accumUpdates, Some(executorMetrics)) } /** --------------------------------------------------------------------- * diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index feae76a087dec..ed6f89e93bc96 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -108,7 +108,14 @@ object MimaExcludes { ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerBlockManagerAdded.apply"), ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.storage.StorageStatus.this"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.storage.StorageStatus.this"), - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.RDDDataDistribution.this") + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.RDDDataDistribution.this"), + + // [SPARK-9103] Update SparkListenerExecutorMetricsUpdate with new executorMetrics + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate.apply"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate.copy"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate.this"), + ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate$"), + ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate.copy$default$2") ) // Exclude rules for 2.1.x