Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,7 @@ class DAGScheduler(
// completion events or stage abort
stageIdToStage -= s.id
jobIdToStageIds -= job.jobId
listenerBus.post(SparkListenerJobEnd(job.jobId, jobResult))
listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTime(), jobResult))
}
}

Expand Down Expand Up @@ -710,7 +710,7 @@ class DAGScheduler(
stage.latestInfo.stageFailed(stageFailedMessage)
listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
}
listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error)))
listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTime(), JobFailed(error)))
}
}

Expand Down Expand Up @@ -749,17 +749,20 @@ class DAGScheduler(
logInfo("Missing parents: " + getMissingParentStages(finalStage))
val shouldRunLocally =
localExecutionEnabled && allowLocal && finalStage.parents.isEmpty && partitions.length == 1
val jobSubmissionTime = clock.getTime()
if (shouldRunLocally) {
// Compute very short actions like first() or take() with no parent stages locally.
listenerBus.post(SparkListenerJobStart(job.jobId, Seq.empty, properties))
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, Seq.empty, properties))
runLocally(job)
} else {
jobIdToActiveJob(jobId) = job
activeJobs += job
finalStage.resultOfJob = Some(job)
val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
listenerBus.post(SparkListenerJobStart(job.jobId, stageInfos, properties))
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
submitStage(finalStage)
}
}
Expand Down Expand Up @@ -965,7 +968,8 @@ class DAGScheduler(
if (job.numFinished == job.numPartitions) {
markStageAsFinished(stage)
cleanupStateForJobAndIndependentStages(job)
listenerBus.post(SparkListenerJobEnd(job.jobId, JobSucceeded))
listenerBus.post(
SparkListenerJobEnd(job.jobId, clock.getTime(), JobSucceeded))
}

// taskSucceeded runs some user code that might throw an exception. Make sure
Expand Down Expand Up @@ -1234,7 +1238,7 @@ class DAGScheduler(
if (ableToCancelStages) {
job.listener.jobFailed(error)
cleanupStateForJobAndIndependentStages(job)
listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error)))
listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTime(), JobFailed(error)))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ case class SparkListenerTaskEnd(
@DeveloperApi
case class SparkListenerJobStart(
jobId: Int,
time: Long,
stageInfos: Seq[StageInfo],
properties: Properties = null)
extends SparkListenerEvent {
Expand All @@ -68,7 +69,11 @@ case class SparkListenerJobStart(
}

@DeveloperApi
case class SparkListenerJobEnd(jobId: Int, jobResult: JobResult) extends SparkListenerEvent
case class SparkListenerJobEnd(
jobId: Int,
time: Long,
jobResult: JobResult)
extends SparkListenerEvent

@DeveloperApi
case class SparkListenerEnvironmentUpdate(environmentDetails: Map[String, Seq[(String, String)]])
Expand Down
15 changes: 7 additions & 8 deletions core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import scala.xml.{Node, NodeSeq}

import javax.servlet.http.HttpServletRequest

import org.apache.spark.JobExecutionStatus
import org.apache.spark.ui.{WebUIPage, UIUtils}
import org.apache.spark.ui.jobs.UIData.JobUIData

Expand Down Expand Up @@ -51,13 +50,13 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)")
val lastStageDescription = lastStageData.flatMap(_.description).getOrElse("")
val duration: Option[Long] = {
job.startTime.map { start =>
val end = job.endTime.getOrElse(System.currentTimeMillis())
job.submissionTime.map { start =>
val end = job.completionTime.getOrElse(System.currentTimeMillis())
end - start
}
}
val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("Unknown")
val formattedSubmissionTime = job.startTime.map(UIUtils.formatDate).getOrElse("Unknown")
val formattedSubmissionTime = job.submissionTime.map(UIUtils.formatDate).getOrElse("Unknown")
val detailUrl =
"%s/jobs/job?id=%s".format(UIUtils.prependBaseUri(parent.basePath), job.jobId)
<tr>
Expand All @@ -68,7 +67,7 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
<div><em>{lastStageDescription}</em></div>
<a href={detailUrl}>{lastStageName}</a>
</td>
<td sorttable_customkey={job.startTime.getOrElse(-1).toString}>
<td sorttable_customkey={job.submissionTime.getOrElse(-1).toString}>
{formattedSubmissionTime}
</td>
<td sorttable_customkey={duration.getOrElse(-1).toString}>{formattedDuration}</td>
Expand Down Expand Up @@ -101,11 +100,11 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
val now = System.currentTimeMillis

val activeJobsTable =
jobsTable(activeJobs.sortBy(_.startTime.getOrElse(-1L)).reverse)
jobsTable(activeJobs.sortBy(_.submissionTime.getOrElse(-1L)).reverse)
val completedJobsTable =
jobsTable(completedJobs.sortBy(_.endTime.getOrElse(-1L)).reverse)
jobsTable(completedJobs.sortBy(_.completionTime.getOrElse(-1L)).reverse)
val failedJobsTable =
jobsTable(failedJobs.sortBy(_.endTime.getOrElse(-1L)).reverse)
jobsTable(failedJobs.sortBy(_.completionTime.getOrElse(-1L)).reverse)

val shouldShowActiveJobs = activeJobs.nonEmpty
val shouldShowCompletedJobs = completedJobs.nonEmpty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,14 +153,13 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
val jobData: JobUIData =
new JobUIData(
jobId = jobStart.jobId,
startTime = Some(System.currentTimeMillis),
endTime = None,
submissionTime = Option(jobStart.time).filter(_ >= 0),
stageIds = jobStart.stageIds,
jobGroup = jobGroup,
status = JobExecutionStatus.RUNNING)
// Compute (a potential underestimate of) the number of tasks that will be run by this job.
// This may be an underestimate because the job start event references all of the result
// stages's transitive stage dependencies, but some of these stages might be skipped if their
// stages' transitive stage dependencies, but some of these stages might be skipped if their
// output is available from earlier runs.
// See https://github.com/apache/spark/pull/3009 for a more extensive discussion.
jobData.numTasks = {
Expand All @@ -186,7 +185,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
logWarning(s"Job completed for unknown job ${jobEnd.jobId}")
new JobUIData(jobId = jobEnd.jobId)
}
jobData.endTime = Some(System.currentTimeMillis())
jobData.completionTime = Option(jobEnd.time).filter(_ >= 0)

jobEnd.jobResult match {
case JobSucceeded =>
completedJobs += jobData
Expand Down Expand Up @@ -309,7 +309,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
val info = taskEnd.taskInfo
// If stage attempt id is -1, it means the DAGScheduler had no idea which attempt this task
// compeletion event is for. Let's just drop it here. This means we might have some speculation
// completion event is for. Let's just drop it here. This means we might have some speculation
// tasks on the web ui that's never marked as complete.
if (info != null && taskEnd.stageAttemptId != -1) {
val stageData = stageIdToData.getOrElseUpdate((taskEnd.stageId, taskEnd.stageAttemptId), {
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,15 @@ private[jobs] object UIData {

class JobUIData(
var jobId: Int = -1,
var startTime: Option[Long] = None,
var endTime: Option[Long] = None,
var submissionTime: Option[Long] = None,
var completionTime: Option[Long] = None,
var stageIds: Seq[Int] = Seq.empty,
var jobGroup: Option[String] = None,
var status: JobExecutionStatus = JobExecutionStatus.UNKNOWN,
/* Tasks */
// `numTasks` is a potential underestimate of the true number of tasks that this job will run.
// This may be an underestimate because the job start event references all of the result
// stages's transitive stage dependencies, but some of these stages might be skipped if their
// stages' transitive stage dependencies, but some of these stages might be skipped if their
// output is available from earlier runs.
// See https://github.com/apache/spark/pull/3009 for a more extensive discussion.
var numTasks: Int = 0,
Expand Down
11 changes: 9 additions & 2 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.executor._
import org.apache.spark.scheduler._
import org.apache.spark.storage._
import org.apache.spark._
import org.apache.hadoop.hdfs.web.JsonUtil

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note this breaks Mapr3 builds. <hadoop.version>1.0.3-mapr-3.0.3</hadoop.version>

The version of mapr3 should be upgraded then to 1.0.3-mapr-3.1.1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't look like this import is even used anywhere, so I think we can probably remove it.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JoshRosen you're right. Is this something you're looking after or do you want me to raise a PR? Should I make a JIRA? Its breaking my mapr3 build

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please file a JIRA, since that helps us to track and prioritize bugs. Feel free to open a pull request if removing this import fixes your build.


/**
* Serializes SparkListener events to/from JSON. This protocol provides strong backwards-
Expand Down Expand Up @@ -141,6 +142,7 @@ private[spark] object JsonProtocol {
val properties = propertiesToJson(jobStart.properties)
("Event" -> Utils.getFormattedClassName(jobStart)) ~
("Job ID" -> jobStart.jobId) ~
("Submission Time" -> jobStart.time) ~
("Stage Infos" -> jobStart.stageInfos.map(stageInfoToJson)) ~ // Added in Spark 1.2.0
("Stage IDs" -> jobStart.stageIds) ~
("Properties" -> properties)
Expand All @@ -150,6 +152,7 @@ private[spark] object JsonProtocol {
val jobResult = jobResultToJson(jobEnd.jobResult)
("Event" -> Utils.getFormattedClassName(jobEnd)) ~
("Job ID" -> jobEnd.jobId) ~
("Completion Time" -> jobEnd.time) ~
("Job Result" -> jobResult)
}

Expand Down Expand Up @@ -492,20 +495,24 @@ private[spark] object JsonProtocol {

def jobStartFromJson(json: JValue): SparkListenerJobStart = {
val jobId = (json \ "Job ID").extract[Int]
val submissionTime =
Utils.jsonOption(json \ "Submission Time").map(_.extract[Long]).getOrElse(-1L)
val stageIds = (json \ "Stage IDs").extract[List[JValue]].map(_.extract[Int])
val properties = propertiesFromJson(json \ "Properties")
// The "Stage Infos" field was added in Spark 1.2.0
val stageInfos = Utils.jsonOption(json \ "Stage Infos")
.map(_.extract[Seq[JValue]].map(stageInfoFromJson)).getOrElse {
stageIds.map(id => new StageInfo(id, 0, "unknown", 0, Seq.empty, "unknown"))
}
SparkListenerJobStart(jobId, stageInfos, properties)
SparkListenerJobStart(jobId, submissionTime, stageInfos, properties)
}

def jobEndFromJson(json: JValue): SparkListenerJobEnd = {
val jobId = (json \ "Job ID").extract[Int]
val completionTime =
Utils.jsonOption(json \ "Completion Time").map(_.extract[Long]).getOrElse(-1L)
val jobResult = jobResultFromJson(json \ "Job Result")
SparkListenerJobEnd(jobId, jobResult)
SparkListenerJobEnd(jobId, completionTime, jobResult)
}

def environmentUpdateFromJson(json: JValue): SparkListenerEnvironmentUpdate = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
/** Length of time to wait while draining listener events. */
val WAIT_TIMEOUT_MILLIS = 10000

val jobCompletionTime = 1421191296660L

before {
sc = new SparkContext("local", "SparkListenerSuite")
}
Expand All @@ -44,7 +46,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
bus.addListener(counter)

// Listener bus hasn't started yet, so posting events should not increment counter
(1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, JobSucceeded)) }
(1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) }
assert(counter.count === 0)

// Starting listener bus should flush all buffered events
Expand All @@ -54,7 +56,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers

// After listener bus has stopped, posting events should not increment counter
bus.stop()
(1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, JobSucceeded)) }
(1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) }
assert(counter.count === 5)

// Listener bus must not be started twice
Expand Down Expand Up @@ -99,7 +101,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers

bus.addListener(blockingListener)
bus.start()
bus.post(SparkListenerJobEnd(0, JobSucceeded))
bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))

listenerStarted.acquire()
// Listener should be blocked after start
Expand Down Expand Up @@ -345,7 +347,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
bus.start()

// Post events to all listeners, and wait until the queue is drained
(1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, JobSucceeded)) }
(1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) }
assert(bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))

// The exception should be caught, and the event should be propagated to other listeners
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import org.apache.spark.util.Utils

class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matchers {

val jobSubmissionTime = 1421191042750L
val jobCompletionTime = 1421191296660L

private def createStageStartEvent(stageId: Int) = {
val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, null, "")
Expand All @@ -46,12 +48,12 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
val stageInfos = stageIds.map { stageId =>
new StageInfo(stageId, 0, stageId.toString, 0, null, "")
}
SparkListenerJobStart(jobId, stageInfos)
SparkListenerJobStart(jobId, jobSubmissionTime, stageInfos)
}

private def createJobEndEvent(jobId: Int, failed: Boolean = false) = {
val result = if (failed) JobFailed(new Exception("dummy failure")) else JobSucceeded
SparkListenerJobEnd(jobId, result)
SparkListenerJobEnd(jobId, jobCompletionTime, result)
}

private def runJob(listener: SparkListener, jobId: Int, shouldFail: Boolean = false) {
Expand Down
31 changes: 27 additions & 4 deletions core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ import org.apache.spark.storage._

class JsonProtocolSuite extends FunSuite {

val jobSubmissionTime = 1421191042750L
val jobCompletionTime = 1421191296660L

test("SparkListenerEvent") {
val stageSubmitted =
SparkListenerStageSubmitted(makeStageInfo(100, 200, 300, 400L, 500L), properties)
Expand All @@ -54,9 +57,9 @@ class JsonProtocolSuite extends FunSuite {
val stageIds = Seq[Int](1, 2, 3, 4)
val stageInfos = stageIds.map(x =>
makeStageInfo(x, x * 200, x * 300, x * 400L, x * 500L))
SparkListenerJobStart(10, stageInfos, properties)
SparkListenerJobStart(10, jobSubmissionTime, stageInfos, properties)
}
val jobEnd = SparkListenerJobEnd(20, JobSucceeded)
val jobEnd = SparkListenerJobEnd(20, jobCompletionTime, JobSucceeded)
val environmentUpdate = SparkListenerEnvironmentUpdate(Map[String, Seq[(String, String)]](
"JVM Information" -> Seq(("GC speed", "9999 objects/s"), ("Java home", "Land of coffee")),
"Spark Properties" -> Seq(("Job throughput", "80000 jobs/s, regardless of job type")),
Expand Down Expand Up @@ -247,13 +250,31 @@ class JsonProtocolSuite extends FunSuite {
val stageInfos = stageIds.map(x => makeStageInfo(x, x * 200, x * 300, x * 400, x * 500))
val dummyStageInfos =
stageIds.map(id => new StageInfo(id, 0, "unknown", 0, Seq.empty, "unknown"))
val jobStart = SparkListenerJobStart(10, stageInfos, properties)
val jobStart = SparkListenerJobStart(10, jobSubmissionTime, stageInfos, properties)
val oldEvent = JsonProtocol.jobStartToJson(jobStart).removeField({_._1 == "Stage Infos"})
val expectedJobStart =
SparkListenerJobStart(10, dummyStageInfos, properties)
SparkListenerJobStart(10, jobSubmissionTime, dummyStageInfos, properties)
assertEquals(expectedJobStart, JsonProtocol.jobStartFromJson(oldEvent))
}

test("SparkListenerJobStart and SparkListenerJobEnd backward compatibility") {
// Prior to Spark 1.3.0, SparkListenerJobStart did not have a "Submission Time" property.
// Also, SparkListenerJobEnd did not have a "Completion Time" property.
val stageIds = Seq[Int](1, 2, 3, 4)
val stageInfos = stageIds.map(x => makeStageInfo(x * 10, x * 20, x * 30, x * 40, x * 50))
val jobStart = SparkListenerJobStart(11, jobSubmissionTime, stageInfos, properties)
val oldStartEvent = JsonProtocol.jobStartToJson(jobStart)
.removeField({ _._1 == "Submission Time"})
val expectedJobStart = SparkListenerJobStart(11, -1, stageInfos, properties)
assertEquals(expectedJobStart, JsonProtocol.jobStartFromJson(oldStartEvent))

val jobEnd = SparkListenerJobEnd(11, jobCompletionTime, JobSucceeded)
val oldEndEvent = JsonProtocol.jobEndToJson(jobEnd)
.removeField({ _._1 == "Completion Time"})
val expectedJobEnd = SparkListenerJobEnd(11, -1, JobSucceeded)
assertEquals(expectedJobEnd, JsonProtocol.jobEndFromJson(oldEndEvent))
}

/** -------------------------- *
| Helper test running methods |
* --------------------------- */
Expand Down Expand Up @@ -1075,6 +1096,7 @@ class JsonProtocolSuite extends FunSuite {
|{
| "Event": "SparkListenerJobStart",
| "Job ID": 10,
| "Submission Time": 1421191042750,
| "Stage Infos": [
| {
| "Stage ID": 1,
Expand Down Expand Up @@ -1349,6 +1371,7 @@ class JsonProtocolSuite extends FunSuite {
|{
| "Event": "SparkListenerJobEnd",
| "Job ID": 20,
| "Completion Time": 1421191296660,
| "Job Result": {
| "Result": "JobSucceeded"
| }
Expand Down