Skip to content

Commit f76de44

Browse files
committed
Improve SparkStatusTracker to also track executor information
1 parent f58319a commit f76de44

File tree

3 files changed

+26
-2
lines changed

3 files changed

+26
-2
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,8 +147,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
147147
appName: String,
148148
sparkHome: String = null,
149149
jars: Seq[String] = Nil,
150-
environment: Map[String, String] = Map()) =
151-
{
150+
environment: Map[String, String] = Map()) = {
152151
this(SparkContext.updatedConf(new SparkConf(), master, appName, sparkHome, jars, environment))
153152
}
154153

core/src/main/scala/org/apache/spark/SparkStatusTracker.scala

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark
1919

20+
import org.apache.spark.scheduler.TaskSchedulerImpl
21+
2022
/**
2123
* Low-level status reporting APIs for monitoring job and stage progress.
2224
*
@@ -104,4 +106,25 @@ class SparkStatusTracker private[spark] (sc: SparkContext) {
104106
}
105107
}
106108
}
109+
110+
def getExecutors(): Array[String] = {
111+
sc.getExecutorStorageStatus.map { status =>
112+
status.blockManagerId.hostPort
113+
}
114+
}
115+
116+
def getCacheSizeByExecutors(): Map[String, Long] = {
117+
sc.getExecutorStorageStatus.map { status =>
118+
status.blockManagerId.hostPort -> status.memUsed
119+
}.toMap
120+
}
121+
122+
def getRunningTasksByExecutors(): Map[String, Int] = {
123+
val executorIdToRunningTasks: Map[String, Int] =
124+
sc.taskScheduler.asInstanceOf[TaskSchedulerImpl].runningTasksByExecutors()
125+
sc.getExecutorStorageStatus.map { status =>
126+
val bmId = status.blockManagerId
127+
bmId.hostPort -> executorIdToRunningTasks.getOrElse(bmId.executorId, 0)
128+
}.toMap
129+
}
107130
}

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@ private[spark] class TaskSchedulerImpl(
9090
// Number of tasks running on each executor
9191
private val executorIdToTaskCount = new HashMap[String, Int]
9292

93+
def runningTasksByExecutors(): Map[String, Int] = executorIdToTaskCount.toMap
94+
9395
// The set of executors we have on each host; this is used to compute hostsAlive, which
9496
// in turn is used to decide when we can attain data locality on a given host
9597
protected val executorsByHost = new HashMap[String, HashSet[String]]

0 commit comments

Comments
 (0)