From f76de444e184923fe9e69a1447a8a3ff09bef07c Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 22 Mar 2016 22:43:47 +0800 Subject: [PATCH 1/4] Improve SparkStatusTracker to also track executor information --- .../scala/org/apache/spark/SparkContext.scala | 3 +-- .../org/apache/spark/SparkStatusTracker.scala | 23 +++++++++++++++++++ .../spark/scheduler/TaskSchedulerImpl.scala | 2 ++ 3 files changed, 26 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index d2cf3bfd60ee3..007dbc62aeead 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -147,8 +147,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli appName: String, sparkHome: String = null, jars: Seq[String] = Nil, - environment: Map[String, String] = Map()) = - { + environment: Map[String, String] = Map()) = { this(SparkContext.updatedConf(new SparkConf(), master, appName, sparkHome, jars, environment)) } diff --git a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala index 34ee3a48f8e74..260c1d5b0d53d 100644 --- a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala +++ b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala @@ -17,6 +17,8 @@ package org.apache.spark +import org.apache.spark.scheduler.TaskSchedulerImpl + /** * Low-level status reporting APIs for monitoring job and stage progress. * @@ -104,4 +106,25 @@ class SparkStatusTracker private[spark] (sc: SparkContext) { } } } + + def getExecutors(): Array[String] = { + sc.getExecutorStorageStatus.map { status => + status.blockManagerId.hostPort + } + } + + def getCacheSizeByExecutors(): Map[String, Long] = { + sc.getExecutorStorageStatus.map { status => + status.blockManagerId.hostPort -> status.memUsed + }.toMap + } + + def getRunningTasksByExecutors(): Map[String, Int] = { + val executorIdToRunningTasks: Map[String, Int] = + sc.taskScheduler.asInstanceOf[TaskSchedulerImpl].runningTasksByExecutors() + sc.getExecutorStorageStatus.map { status => + val bmId = status.blockManagerId + bmId.hostPort -> executorIdToRunningTasks.getOrElse(bmId.executorId, 0) + }.toMap + } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index f7790fccc683c..daed2ff50e157 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -90,6 +90,8 @@ private[spark] class TaskSchedulerImpl( // Number of tasks running on each executor private val executorIdToTaskCount = new HashMap[String, Int] + def runningTasksByExecutors(): Map[String, Int] = executorIdToTaskCount.toMap + // The set of executors we have on each host; this is used to compute hostsAlive, which // in turn is used to decide when we can attain data locality on a given host protected val executorsByHost = new HashMap[String, HashSet[String]] From fe8039013efd2e4504168dff264f725fb56665a9 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 23 Mar 2016 11:53:19 +0800 Subject: [PATCH 2/4] update --- .../scala/org/apache/spark/SparkStatusTracker.scala | 11 ++++++++++- .../scala/org/apache/spark/storage/StorageUtils.scala | 5 ++++- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala index 260c1d5b0d53d..e586c3c24b8ae 100644 --- a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala +++ b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala @@ -107,18 +107,27 @@ class SparkStatusTracker private[spark] (sc: SparkContext) { } } + /** + * Returns a list of all known executors, represented by string with format: "host:port" + */ def getExecutors(): Array[String] = { sc.getExecutorStorageStatus.map { status => status.blockManagerId.hostPort } } + /** + * Returns a map contains executor id(host+port) and its cache size(memory used by caching RDDs). + */ def getCacheSizeByExecutors(): Map[String, Long] = { sc.getExecutorStorageStatus.map { status => - status.blockManagerId.hostPort -> status.memUsed + status.blockManagerId.hostPort -> status.cacheSize }.toMap } + /** + * Returns a map contains executor id(host+port) and its number of running tasks. + */ def getRunningTasksByExecutors(): Map[String, Int] = { val executorIdToRunningTasks: Map[String, Int] = sc.taskScheduler.asInstanceOf[TaskSchedulerImpl].runningTasksByExecutors() diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala index 199a5fc270a41..fb9941bbd9e0f 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala @@ -175,7 +175,10 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) { def memRemaining: Long = maxMem - memUsed /** Return the memory used by this block manager. */ - def memUsed: Long = _nonRddStorageInfo._1 + _rddBlocks.keys.toSeq.map(memUsedByRdd).sum + def memUsed: Long = _nonRddStorageInfo._1 + cacheSize + + /** Return the memory used by caching RDDs */ + def cacheSize: Long = _rddBlocks.keys.toSeq.map(memUsedByRdd).sum /** Return the disk space used by this block manager. */ def diskUsed: Long = _nonRddStorageInfo._2 + _rddBlocks.keys.toSeq.map(diskUsedByRdd).sum From 0b9400e2a62f6a723be79264dfeddf947755f3d0 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 30 Mar 2016 23:14:09 +0800 Subject: [PATCH 3/4] rename --- core/src/main/scala/org/apache/spark/SparkStatusTracker.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala index e586c3c24b8ae..c0725d381333d 100644 --- a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala +++ b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala @@ -110,7 +110,7 @@ class SparkStatusTracker private[spark] (sc: SparkContext) { /** * Returns a list of all known executors, represented by string with format: "host:port" */ - def getExecutors(): Array[String] = { + def getExecutorList(): Array[String] = { sc.getExecutorStorageStatus.map { status => status.blockManagerId.hostPort } From 39dd0eed15d9c67604cb3430350c4f0f57c5f7e7 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 31 Mar 2016 10:10:18 +0800 Subject: [PATCH 4/4] update --- .../org/apache/spark/SparkExecutorInfo.java | 33 +++++++++++++++++++ .../org/apache/spark/SparkStatusTracker.scala | 32 ++++++------------ .../org/apache/spark/StatusAPIImpl.scala | 33 +++++++++++-------- 3 files changed, 63 insertions(+), 35 deletions(-) create mode 100644 core/src/main/java/org/apache/spark/SparkExecutorInfo.java diff --git a/core/src/main/java/org/apache/spark/SparkExecutorInfo.java b/core/src/main/java/org/apache/spark/SparkExecutorInfo.java new file mode 100644 index 0000000000000..dc3e826475987 --- /dev/null +++ b/core/src/main/java/org/apache/spark/SparkExecutorInfo.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark; + +import java.io.Serializable; + +/** + * Exposes information about Spark Executors. + * + * This interface is not designed to be implemented outside of Spark. We may add additional methods + * which may break binary compatibility with outside implementations. + */ +public interface SparkExecutorInfo extends Serializable { + String host(); + int port(); + long cacheSize(); + int numRunningTasks(); +} diff --git a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala index c0725d381333d..52c4656c271bc 100644 --- a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala +++ b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala @@ -108,32 +108,20 @@ class SparkStatusTracker private[spark] (sc: SparkContext) { } /** - * Returns a list of all known executors, represented by string with format: "host:port" + * Returns information of all known executors, including host, port, cacheSize, numRunningTasks. */ - def getExecutorList(): Array[String] = { - sc.getExecutorStorageStatus.map { status => - status.blockManagerId.hostPort - } - } - - /** - * Returns a map contains executor id(host+port) and its cache size(memory used by caching RDDs). - */ - def getCacheSizeByExecutors(): Map[String, Long] = { - sc.getExecutorStorageStatus.map { status => - status.blockManagerId.hostPort -> status.cacheSize - }.toMap - } - - /** - * Returns a map contains executor id(host+port) and its number of running tasks. - */ - def getRunningTasksByExecutors(): Map[String, Int] = { + def getExecutorInfos: Array[SparkExecutorInfo] = { val executorIdToRunningTasks: Map[String, Int] = sc.taskScheduler.asInstanceOf[TaskSchedulerImpl].runningTasksByExecutors() + sc.getExecutorStorageStatus.map { status => val bmId = status.blockManagerId - bmId.hostPort -> executorIdToRunningTasks.getOrElse(bmId.executorId, 0) - }.toMap + new SparkExecutorInfoImpl( + bmId.host, + bmId.port, + status.cacheSize, + executorIdToRunningTasks.getOrElse(bmId.executorId, 0) + ) + } } } diff --git a/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala b/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala index e5c7c8d0db578..c1f24a6377788 100644 --- a/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala +++ b/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala @@ -18,18 +18,25 @@ package org.apache.spark private class SparkJobInfoImpl ( - val jobId: Int, - val stageIds: Array[Int], - val status: JobExecutionStatus) - extends SparkJobInfo + val jobId: Int, + val stageIds: Array[Int], + val status: JobExecutionStatus) + extends SparkJobInfo private class SparkStageInfoImpl( - val stageId: Int, - val currentAttemptId: Int, - val submissionTime: Long, - val name: String, - val numTasks: Int, - val numActiveTasks: Int, - val numCompletedTasks: Int, - val numFailedTasks: Int) - extends SparkStageInfo + val stageId: Int, + val currentAttemptId: Int, + val submissionTime: Long, + val name: String, + val numTasks: Int, + val numActiveTasks: Int, + val numCompletedTasks: Int, + val numFailedTasks: Int) + extends SparkStageInfo + +private class SparkExecutorInfoImpl( + val host: String, + val port: Int, + val cacheSize: Long, + val numRunningTasks: Int) + extends SparkExecutorInfo