@@ -85,6 +85,12 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
8585 val jobData : JobUIData =
8686 new JobUIData (jobStart.jobId, Some (System .currentTimeMillis), None , jobStart.stageIds,
8787 jobGroup, JobExecutionStatus .RUNNING )
88+ // Compute (a potential underestimate of) the number of tasks that will be run by this job:
89+ jobData.numTasks = {
90+ val allStages = jobStart.stageInfos
91+ val missingStages = allStages.filter(_.completionTime.isEmpty)
92+ missingStages.map(_.numTasks).sum
93+ }
8894 jobIdToData(jobStart.jobId) = jobData
8995 activeJobs(jobStart.jobId) = jobData
9096 for (stageId <- jobStart.stageIds) {
@@ -189,7 +195,6 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
189195 jobId <- activeJobsDependentOnStage;
190196 jobData <- jobIdToData.get(jobId)
191197 ) {
192- jobData.numTasks += stage.numTasks
193198 jobData.numActiveStages += 1
194199 }
195200 }
@@ -245,8 +250,6 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
245250 execSummary.taskTime += info.duration
246251 stageData.numActiveTasks -= 1
247252
248- val isRecomputation = stageData.completedIndices.contains(info.index)
249-
250253 val (errorMessage, metrics): (Option [String ], Option [TaskMetrics ]) =
251254 taskEnd.reason match {
252255 case org.apache.spark.Success =>
@@ -279,9 +282,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
279282 jobData.numActiveTasks -= 1
280283 taskEnd.reason match {
281284 case Success =>
282- if (! isRecomputation) {
283- jobData.numCompletedTasks += 1
284- }
285+ jobData.numCompletedTasks += 1
285286 case _ =>
286287 jobData.numFailedTasks += 1
287288 }
0 commit comments