Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 1 addition & 10 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ import org.apache.spark.status.AppStateStore
import org.apache.spark.storage._
import org.apache.spark.storage.BlockManagerMessages.TriggerThreadDump
import org.apache.spark.ui.{ConsoleProgressBar, SparkUI}
import org.apache.spark.ui.jobs.JobProgressListener
import org.apache.spark.util._

/**
Expand Down Expand Up @@ -197,7 +196,6 @@ class SparkContext(config: SparkConf) extends Logging {
private var _eventLogDir: Option[URI] = None
private var _eventLogCodec: Option[String] = None
private var _env: SparkEnv = _
private var _jobProgressListener: JobProgressListener = _
private var _statusTracker: SparkStatusTracker = _
private var _progressBar: Option[ConsoleProgressBar] = None
private var _ui: Option[SparkUI] = None
Expand Down Expand Up @@ -270,8 +268,6 @@ class SparkContext(config: SparkConf) extends Logging {
val map: ConcurrentMap[Int, RDD[_]] = new MapMaker().weakValues().makeMap[Int, RDD[_]]()
map.asScala
}
private[spark] def jobProgressListener: JobProgressListener = _jobProgressListener

def statusTracker: SparkStatusTracker = _statusTracker

private[spark] def progressBar: Option[ConsoleProgressBar] = _progressBar
Expand Down Expand Up @@ -425,11 +421,6 @@ class SparkContext(config: SparkConf) extends Logging {

if (master == "yarn" && deployMode == "client") System.setProperty("SPARK_YARN_MODE", "true")

// "_jobProgressListener" should be set up before creating SparkEnv because when creating
// "SparkEnv", some messages will be posted to "listenerBus" and we should not miss them.
_jobProgressListener = new JobProgressListener(_conf)
listenerBus.addListener(jobProgressListener)

// Initialize the app state store and listener before SparkEnv is created so that it gets
// all events.
_stateStore = AppStateStore.createTempStore(conf, listenerBus)
Expand All @@ -444,7 +435,7 @@ class SparkContext(config: SparkConf) extends Logging {
_conf.set("spark.repl.class.uri", replUri)
}

_statusTracker = new SparkStatusTracker(this)
_statusTracker = new SparkStatusTracker(this, _stateStore)

_progressBar =
if (_conf.getBoolean("spark.ui.showConsoleProgress", true) && !log.isInfoEnabled) {
Expand Down
79 changes: 37 additions & 42 deletions core/src/main/scala/org/apache/spark/SparkStatusTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@

package org.apache.spark

import java.util.Arrays

import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.status.AppStateStore
import org.apache.spark.status.api.v1.StageStatus

/**
* Low-level status reporting APIs for monitoring job and stage progress.
Expand All @@ -33,9 +37,7 @@ import org.apache.spark.scheduler.TaskSchedulerImpl
*
* NOTE: this class's constructor should be considered private and may be subject to change.
*/
class SparkStatusTracker private[spark] (sc: SparkContext) {

private val jobProgressListener = sc.jobProgressListener
class SparkStatusTracker private[spark] (sc: SparkContext, store: AppStateStore) {

/**
* Return a list of all known jobs in a particular job group. If `jobGroup` is `null`, then
Expand All @@ -46,9 +48,8 @@ class SparkStatusTracker private[spark] (sc: SparkContext) {
* its result.
*/
def getJobIdsForGroup(jobGroup: String): Array[Int] = {
jobProgressListener.synchronized {
jobProgressListener.jobGroupToJobIds.getOrElse(jobGroup, Seq.empty).toArray
}
val expected = Option(jobGroup)
store.jobsList(null).filter(_.jobGroup == expected).map(_.jobId).toArray
}

/**
Expand All @@ -57,9 +58,7 @@ class SparkStatusTracker private[spark] (sc: SparkContext) {
* This method does not guarantee the order of the elements in its result.
*/
def getActiveStageIds(): Array[Int] = {
jobProgressListener.synchronized {
jobProgressListener.activeStages.values.map(_.stageId).toArray
}
store.stageList(Arrays.asList(StageStatus.ACTIVE)).map(_.stageId).toArray
}

/**
Expand All @@ -68,19 +67,18 @@ class SparkStatusTracker private[spark] (sc: SparkContext) {
* This method does not guarantee the order of the elements in its result.
*/
def getActiveJobIds(): Array[Int] = {
jobProgressListener.synchronized {
jobProgressListener.activeJobs.values.map(_.jobId).toArray
}
store.jobsList(Arrays.asList(JobExecutionStatus.RUNNING)).map(_.jobId).toArray
}

/**
* Returns job information, or `None` if the job info could not be found or was garbage collected.
*/
def getJobInfo(jobId: Int): Option[SparkJobInfo] = {
jobProgressListener.synchronized {
jobProgressListener.jobIdToData.get(jobId).map { data =>
new SparkJobInfoImpl(jobId, data.stageIds.toArray, data.status)
}
try {
val job = store.job(jobId)
Some(new SparkJobInfoImpl(jobId, job.stageIds.toArray, job.status))
} catch {
case _: NoSuchElementException => None
}
}

Expand All @@ -89,39 +87,36 @@ class SparkStatusTracker private[spark] (sc: SparkContext) {
* garbage collected.
*/
def getStageInfo(stageId: Int): Option[SparkStageInfo] = {
jobProgressListener.synchronized {
for (
info <- jobProgressListener.stageIdToInfo.get(stageId);
data <- jobProgressListener.stageIdToData.get((stageId, info.attemptId))
) yield {
new SparkStageInfoImpl(
stageId,
info.attemptId,
info.submissionTime.getOrElse(0),
info.name,
info.numTasks,
data.numActiveTasks,
data.numCompleteTasks,
data.numFailedTasks)
}
try {
val info = store.lastStageAttempt(stageId)
Some(new SparkStageInfoImpl(
stageId,
info.attemptId,
info.submissionTime.map(_.getTime()).getOrElse(0L),
info.name,
info.numTasks,
info.numActiveTasks,
info.numCompleteTasks,
info.numFailedTasks))
} catch {
case _: NoSuchElementException => None
}
}

/**
* Returns information of all known executors, including host, port, cacheSize, numRunningTasks.
*/
def getExecutorInfos: Array[SparkExecutorInfo] = {
val executorIdToRunningTasks: Map[String, Int] =
sc.taskScheduler.asInstanceOf[TaskSchedulerImpl].runningTasksByExecutors

sc.getExecutorStorageStatus.map { status =>
val bmId = status.blockManagerId
store.executorList(true).map { exec =>
val (host, port) = exec.hostPort.split(":", 2) match {
case Array(h, p) => (h, p.toInt)
case Array(h) => (h, -1)
}
new SparkExecutorInfoImpl(
bmId.host,
bmId.port,
status.cacheSize,
executorIdToRunningTasks.getOrElse(bmId.executorId, 0)
)
}
host,
port,
exec.maxMemory,
exec.activeTasks)
}.toArray
}
}
26 changes: 25 additions & 1 deletion core/src/main/scala/org/apache/spark/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.nio.charset.StandardCharsets
import java.security.SecureRandom
import java.security.cert.X509Certificate
import java.util.Arrays
import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.concurrent.{CountDownLatch, TimeoutException, TimeUnit}
import java.util.jar.{JarEntry, JarOutputStream}
import javax.net.ssl._
import javax.tools.{JavaFileObject, SimpleJavaFileObject, ToolProvider}
Expand Down Expand Up @@ -232,6 +232,30 @@ private[spark] object TestUtils {
}
}

/**
* Wait until at least `numExecutors` executors are up, or throw `TimeoutException` if the waiting
* time elapsed before `numExecutors` executors up. Exposed for testing.
*
* @param numExecutors the number of executors to wait at least
* @param timeout time to wait in milliseconds
*/
private[spark] def waitUntilExecutorsUp(
sc: SparkContext,
numExecutors: Int,
timeout: Long): Unit = {
val finishTime = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout)
while (System.nanoTime() < finishTime) {
if (sc.statusTracker.getExecutorInfos.length > numExecutors) {
return
}
// Sleep rather than using wait/notify, because this is used only for testing and wait/notify
// add overhead in the general case.
Thread.sleep(10)
}
throw new TimeoutException(
s"Can't find $numExecutors executors before $timeout milliseconds elapsed")
}

}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import org.apache.spark.scheduler.StageInfo
import org.apache.spark.status.api.v1.StageStatus._
import org.apache.spark.status.api.v1.TaskSorting._
import org.apache.spark.ui.SparkUI
import org.apache.spark.ui.jobs.UIData.StageUIData

@Produces(Array(MediaType.APPLICATION_JSON))
private[v1] class StagesResource extends BaseAppResource {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import org.apache.spark.scheduler._
import org.apache.spark.status.AppStateStore
import org.apache.spark.status.api.v1
import org.apache.spark.ui._
import org.apache.spark.ui.jobs.UIData.{JobUIData, StageUIData}
import org.apache.spark.util.Utils

/** Page showing list of all ongoing and recently finished jobs */
Expand Down
Loading