Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions core/src/main/java/org/apache/spark/SparkExecutorInfo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark;

import java.io.Serializable;

/**
* Exposes information about Spark Executors.
*
* This interface is not designed to be implemented outside of Spark. We may add additional methods
* which may break binary compatibility with outside implementations.
*/
public interface SparkExecutorInfo extends Serializable {
String host();
int port();
long cacheSize();
int numRunningTasks();
}
3 changes: 1 addition & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
appName: String,
sparkHome: String = null,
jars: Seq[String] = Nil,
environment: Map[String, String] = Map()) =
{
environment: Map[String, String] = Map()) = {
this(SparkContext.updatedConf(new SparkConf(), master, appName, sparkHome, jars, environment))
}

Expand Down
20 changes: 20 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkStatusTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark

import org.apache.spark.scheduler.TaskSchedulerImpl

/**
* Low-level status reporting APIs for monitoring job and stage progress.
*
Expand Down Expand Up @@ -104,4 +106,22 @@ class SparkStatusTracker private[spark] (sc: SparkContext) {
}
}
}

/**
* Returns information of all known executors, including host, port, cacheSize, numRunningTasks.
*/
def getExecutorInfos: Array[SparkExecutorInfo] = {
val executorIdToRunningTasks: Map[String, Int] =
sc.taskScheduler.asInstanceOf[TaskSchedulerImpl].runningTasksByExecutors()

sc.getExecutorStorageStatus.map { status =>
val bmId = status.blockManagerId
new SparkExecutorInfoImpl(
bmId.host,
bmId.port,
status.cacheSize,
executorIdToRunningTasks.getOrElse(bmId.executorId, 0)
)
}
}
}
33 changes: 20 additions & 13 deletions core/src/main/scala/org/apache/spark/StatusAPIImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,25 @@
package org.apache.spark

private class SparkJobInfoImpl (
val jobId: Int,
val stageIds: Array[Int],
val status: JobExecutionStatus)
extends SparkJobInfo
val jobId: Int,
val stageIds: Array[Int],
val status: JobExecutionStatus)
extends SparkJobInfo

private class SparkStageInfoImpl(
val stageId: Int,
val currentAttemptId: Int,
val submissionTime: Long,
val name: String,
val numTasks: Int,
val numActiveTasks: Int,
val numCompletedTasks: Int,
val numFailedTasks: Int)
extends SparkStageInfo
val stageId: Int,
val currentAttemptId: Int,
val submissionTime: Long,
val name: String,
val numTasks: Int,
val numActiveTasks: Int,
val numCompletedTasks: Int,
val numFailedTasks: Int)
extends SparkStageInfo

private class SparkExecutorInfoImpl(
val host: String,
val port: Int,
val cacheSize: Long,
val numRunningTasks: Int)
extends SparkExecutorInfo
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ private[spark] class TaskSchedulerImpl(
// Number of tasks running on each executor
private val executorIdToTaskCount = new HashMap[String, Int]

def runningTasksByExecutors(): Map[String, Int] = executorIdToTaskCount.toMap
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding a synchronized here would resolve the thread-safety issue, I think. I'll do this as part of a patch fixing another bug and also touching this line.


// The set of executors we have on each host; this is used to compute hostsAlive, which
// in turn is used to decide when we can attain data locality on a given host
protected val executorsByHost = new HashMap[String, HashSet[String]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,10 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
def memRemaining: Long = maxMem - memUsed

/** Return the memory used by this block manager. */
def memUsed: Long = _nonRddStorageInfo._1 + _rddBlocks.keys.toSeq.map(memUsedByRdd).sum
def memUsed: Long = _nonRddStorageInfo._1 + cacheSize

/** Return the memory used by caching RDDs */
def cacheSize: Long = _rddBlocks.keys.toSeq.map(memUsedByRdd).sum

/** Return the disk space used by this block manager. */
def diskUsed: Long = _nonRddStorageInfo._2 + _rddBlocks.keys.toSeq.map(diskUsedByRdd).sum
Expand Down