Skip to content

Commit d61337f

Browse files
committed
Merge pull request #844 from markhamstra/priorityRename
Renamed 'priority' to 'jobId' and assorted minor changes
2 parents 8cae72e + 1630fbf commit d61337f

File tree

5 files changed

+60
-59
lines changed

5 files changed

+60
-59
lines changed

core/src/main/scala/spark/scheduler/ActiveJob.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import java.util.Properties
2525
* Tracks information about an active job in the DAGScheduler.
2626
*/
2727
private[spark] class ActiveJob(
28-
val runId: Int,
28+
val jobId: Int,
2929
val finalStage: Stage,
3030
val func: (TaskContext, Iterator[_]) => _,
3131
val partitions: Array[Int],

core/src/main/scala/spark/scheduler/DAGScheduler.scala

Lines changed: 42 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -104,11 +104,11 @@ class DAGScheduler(
104104

105105
private val eventQueue = new LinkedBlockingQueue[DAGSchedulerEvent]
106106

107-
val nextRunId = new AtomicInteger(0)
107+
val nextJobId = new AtomicInteger(0)
108108

109109
val nextStageId = new AtomicInteger(0)
110110

111-
val idToStage = new TimeStampedHashMap[Int, Stage]
111+
val stageIdToStage = new TimeStampedHashMap[Int, Stage]
112112

113113
val shuffleToMapStage = new TimeStampedHashMap[Int, Stage]
114114

@@ -171,28 +171,28 @@ class DAGScheduler(
171171

172172
/**
173173
* Get or create a shuffle map stage for the given shuffle dependency's map side.
174-
* The priority value passed in will be used if the stage doesn't already exist with
175-
* a lower priority (we assume that priorities always increase across jobs for now).
174+
* The jobId value passed in will be used if the stage doesn't already exist with
175+
* a lower jobId (jobId always increases across jobs.)
176176
*/
177-
private def getShuffleMapStage(shuffleDep: ShuffleDependency[_,_], priority: Int): Stage = {
177+
private def getShuffleMapStage(shuffleDep: ShuffleDependency[_,_], jobId: Int): Stage = {
178178
shuffleToMapStage.get(shuffleDep.shuffleId) match {
179179
case Some(stage) => stage
180180
case None =>
181-
val stage = newStage(shuffleDep.rdd, Some(shuffleDep), priority)
181+
val stage = newStage(shuffleDep.rdd, Some(shuffleDep), jobId)
182182
shuffleToMapStage(shuffleDep.shuffleId) = stage
183183
stage
184184
}
185185
}
186186

187187
/**
188188
* Create a Stage for the given RDD, either as a shuffle map stage (for a ShuffleDependency) or
189-
* as a result stage for the final RDD used directly in an action. The stage will also be given
190-
* the provided priority.
189+
* as a result stage for the final RDD used directly in an action. The stage will also be
190+
* associated with the provided jobId.
191191
*/
192192
private def newStage(
193193
rdd: RDD[_],
194194
shuffleDep: Option[ShuffleDependency[_,_]],
195-
priority: Int,
195+
jobId: Int,
196196
callSite: Option[String] = None)
197197
: Stage =
198198
{
@@ -203,17 +203,17 @@ class DAGScheduler(
203203
mapOutputTracker.registerShuffle(shuffleDep.get.shuffleId, rdd.partitions.size)
204204
}
205205
val id = nextStageId.getAndIncrement()
206-
val stage = new Stage(id, rdd, shuffleDep, getParentStages(rdd, priority), priority, callSite)
207-
idToStage(id) = stage
206+
val stage = new Stage(id, rdd, shuffleDep, getParentStages(rdd, jobId), jobId, callSite)
207+
stageIdToStage(id) = stage
208208
stageToInfos(stage) = StageInfo(stage)
209209
stage
210210
}
211211

212212
/**
213213
* Get or create the list of parent stages for a given RDD. The stages will be assigned the
214-
* provided priority if they haven't already been created with a lower priority.
214+
* provided jobId if they haven't already been created with a lower jobId.
215215
*/
216-
private def getParentStages(rdd: RDD[_], priority: Int): List[Stage] = {
216+
private def getParentStages(rdd: RDD[_], jobId: Int): List[Stage] = {
217217
val parents = new HashSet[Stage]
218218
val visited = new HashSet[RDD[_]]
219219
def visit(r: RDD[_]) {
@@ -224,7 +224,7 @@ class DAGScheduler(
224224
for (dep <- r.dependencies) {
225225
dep match {
226226
case shufDep: ShuffleDependency[_,_] =>
227-
parents += getShuffleMapStage(shufDep, priority)
227+
parents += getShuffleMapStage(shufDep, jobId)
228228
case _ =>
229229
visit(dep.rdd)
230230
}
@@ -245,7 +245,7 @@ class DAGScheduler(
245245
for (dep <- rdd.dependencies) {
246246
dep match {
247247
case shufDep: ShuffleDependency[_,_] =>
248-
val mapStage = getShuffleMapStage(shufDep, stage.priority)
248+
val mapStage = getShuffleMapStage(shufDep, stage.jobId)
249249
if (!mapStage.isAvailable) {
250250
missing += mapStage
251251
}
@@ -282,7 +282,7 @@ class DAGScheduler(
282282
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
283283
val toSubmit = JobSubmitted(finalRdd, func2, partitions.toArray, allowLocal, callSite, waiter,
284284
properties)
285-
return (toSubmit, waiter)
285+
(toSubmit, waiter)
286286
}
287287

288288
def runJob[T, U: ClassManifest](
@@ -329,8 +329,8 @@ class DAGScheduler(
329329
val listener = new ApproximateActionListener(rdd, func, evaluator, timeout)
330330
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
331331
val partitions = (0 until rdd.partitions.size).toArray
332-
eventQueue.put(JobSubmitted(rdd, func2, partitions, false, callSite, listener, properties))
333-
return listener.awaitResult() // Will throw an exception if the job fails
332+
eventQueue.put(JobSubmitted(rdd, func2, partitions, allowLocal = false, callSite, listener, properties))
333+
listener.awaitResult() // Will throw an exception if the job fails
334334
}
335335

336336
/**
@@ -340,11 +340,11 @@ class DAGScheduler(
340340
private[scheduler] def processEvent(event: DAGSchedulerEvent): Boolean = {
341341
event match {
342342
case JobSubmitted(finalRDD, func, partitions, allowLocal, callSite, listener, properties) =>
343-
val runId = nextRunId.getAndIncrement()
344-
val finalStage = newStage(finalRDD, None, runId, Some(callSite))
345-
val job = new ActiveJob(runId, finalStage, func, partitions, callSite, listener, properties)
343+
val jobId = nextJobId.getAndIncrement()
344+
val finalStage = newStage(finalRDD, None, jobId, Some(callSite))
345+
val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
346346
clearCacheLocs()
347-
logInfo("Got job " + job.runId + " (" + callSite + ") with " + partitions.length +
347+
logInfo("Got job " + job.jobId + " (" + callSite + ") with " + partitions.length +
348348
" output partitions (allowLocal=" + allowLocal + ")")
349349
logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
350350
logInfo("Parents of final stage: " + finalStage.parents)
@@ -354,7 +354,7 @@ class DAGScheduler(
354354
runLocally(job)
355355
} else {
356356
listenerBus.post(SparkListenerJobStart(job, properties))
357-
idToActiveJob(runId) = job
357+
idToActiveJob(jobId) = job
358358
activeJobs += job
359359
resultStageToJob(finalStage) = job
360360
submitStage(finalStage)
@@ -375,7 +375,7 @@ class DAGScheduler(
375375
handleTaskCompletion(completion)
376376

377377
case TaskSetFailed(taskSet, reason) =>
378-
abortStage(idToStage(taskSet.stageId), reason)
378+
abortStage(stageIdToStage(taskSet.stageId), reason)
379379

380380
case StopDAGScheduler =>
381381
// Cancel any active jobs
@@ -386,7 +386,7 @@ class DAGScheduler(
386386
}
387387
return true
388388
}
389-
return false
389+
false
390390
}
391391

392392
/**
@@ -398,7 +398,7 @@ class DAGScheduler(
398398
clearCacheLocs()
399399
val failed2 = failed.toArray
400400
failed.clear()
401-
for (stage <- failed2.sortBy(_.priority)) {
401+
for (stage <- failed2.sortBy(_.jobId)) {
402402
submitStage(stage)
403403
}
404404
}
@@ -416,7 +416,7 @@ class DAGScheduler(
416416
logTrace("failed: " + failed)
417417
val waiting2 = waiting.toArray
418418
waiting.clear()
419-
for (stage <- waiting2.sortBy(_.priority)) {
419+
for (stage <- waiting2.sortBy(_.jobId)) {
420420
submitStage(stage)
421421
}
422422
}
@@ -463,7 +463,7 @@ class DAGScheduler(
463463
*/
464464
protected def runLocally(job: ActiveJob) {
465465
logInfo("Computing the requested partition locally")
466-
new Thread("Local computation of job " + job.runId) {
466+
new Thread("Local computation of job " + job.jobId) {
467467
override def run() {
468468
runLocallyWithinThread(job)
469469
}
@@ -531,7 +531,7 @@ class DAGScheduler(
531531
}
532532
// must be run listener before possible NotSerializableException
533533
// should be "StageSubmitted" first and then "JobEnded"
534-
val properties = idToActiveJob(stage.priority).properties
534+
val properties = idToActiveJob(stage.jobId).properties
535535
listenerBus.post(SparkListenerStageSubmitted(stage, tasks.size, properties))
536536

537537
if (tasks.size > 0) {
@@ -552,7 +552,7 @@ class DAGScheduler(
552552
myPending ++= tasks
553553
logDebug("New pending tasks: " + myPending)
554554
taskSched.submitTasks(
555-
new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.priority, properties))
555+
new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
556556
if (!stage.submissionTime.isDefined) {
557557
stage.submissionTime = Some(System.currentTimeMillis())
558558
}
@@ -569,7 +569,7 @@ class DAGScheduler(
569569
*/
570570
private def handleTaskCompletion(event: CompletionEvent) {
571571
val task = event.task
572-
val stage = idToStage(task.stageId)
572+
val stage = stageIdToStage(task.stageId)
573573

574574
def markStageAsFinished(stage: Stage) = {
575575
val serviceTime = stage.submissionTime match {
@@ -598,7 +598,7 @@ class DAGScheduler(
598598
job.numFinished += 1
599599
// If the whole job has finished, remove it
600600
if (job.numFinished == job.numPartitions) {
601-
idToActiveJob -= stage.priority
601+
idToActiveJob -= stage.jobId
602602
activeJobs -= job
603603
resultStageToJob -= stage
604604
markStageAsFinished(stage)
@@ -635,7 +635,7 @@ class DAGScheduler(
635635
mapOutputTracker.registerMapOutputs(
636636
stage.shuffleDep.get.shuffleId,
637637
stage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray,
638-
true)
638+
changeEpoch = true)
639639
}
640640
clearCacheLocs()
641641
if (stage.outputLocs.count(_ == Nil) != 0) {
@@ -669,7 +669,7 @@ class DAGScheduler(
669669

670670
case FetchFailed(bmAddress, shuffleId, mapId, reduceId) =>
671671
// Mark the stage that the reducer was in as unrunnable
672-
val failedStage = idToStage(task.stageId)
672+
val failedStage = stageIdToStage(task.stageId)
673673
running -= failedStage
674674
failed += failedStage
675675
// TODO: Cancel running tasks in the stage
@@ -697,7 +697,7 @@ class DAGScheduler(
697697

698698
case other =>
699699
// Unrecognized failure - abort all jobs depending on this stage
700-
abortStage(idToStage(task.stageId), task + " failed: " + other)
700+
abortStage(stageIdToStage(task.stageId), task + " failed: " + other)
701701
}
702702
}
703703

@@ -718,7 +718,7 @@ class DAGScheduler(
718718
for ((shuffleId, stage) <- shuffleToMapStage) {
719719
stage.removeOutputsOnExecutor(execId)
720720
val locs = stage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray
721-
mapOutputTracker.registerMapOutputs(shuffleId, locs, true)
721+
mapOutputTracker.registerMapOutputs(shuffleId, locs, changeEpoch = true)
722722
}
723723
if (shuffleToMapStage.isEmpty) {
724724
mapOutputTracker.incrementEpoch()
@@ -750,7 +750,7 @@ class DAGScheduler(
750750
val error = new SparkException("Job failed: " + reason)
751751
job.listener.jobFailed(error)
752752
listenerBus.post(SparkListenerJobEnd(job, JobFailed(error, Some(failedStage))))
753-
idToActiveJob -= resultStage.priority
753+
idToActiveJob -= resultStage.jobId
754754
activeJobs -= job
755755
resultStageToJob -= resultStage
756756
}
@@ -774,7 +774,7 @@ class DAGScheduler(
774774
for (dep <- rdd.dependencies) {
775775
dep match {
776776
case shufDep: ShuffleDependency[_,_] =>
777-
val mapStage = getShuffleMapStage(shufDep, stage.priority)
777+
val mapStage = getShuffleMapStage(shufDep, stage.jobId)
778778
if (!mapStage.isAvailable) {
779779
visitedStages += mapStage
780780
visit(mapStage.rdd)
@@ -812,13 +812,13 @@ class DAGScheduler(
812812
}
813813
case _ =>
814814
})
815-
return Nil
815+
Nil
816816
}
817817

818818
private def cleanup(cleanupTime: Long) {
819-
var sizeBefore = idToStage.size
820-
idToStage.clearOldValues(cleanupTime)
821-
logInfo("idToStage " + sizeBefore + " --> " + idToStage.size)
819+
var sizeBefore = stageIdToStage.size
820+
stageIdToStage.clearOldValues(cleanupTime)
821+
logInfo("stageIdToStage " + sizeBefore + " --> " + stageIdToStage.size)
822822

823823
sizeBefore = shuffleToMapStage.size
824824
shuffleToMapStage.clearOldValues(cleanupTime)

core/src/main/scala/spark/scheduler/DAGSchedulerSource.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler) extends
2121
})
2222

2323
metricRegistry.register(MetricRegistry.name("job", "allJobs", "number"), new Gauge[Int] {
24-
override def getValue: Int = dagScheduler.nextRunId.get()
24+
override def getValue: Int = dagScheduler.nextJobId.get()
2525
})
2626

2727
metricRegistry.register(MetricRegistry.name("job", "activeJobs", "number"), new Gauge[Int] {

core/src/main/scala/spark/scheduler/JobLogger.scala

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
102102
stageIDToJobID.get(stageID).foreach(jobID => jobLogInfo(jobID, info, withTime))
103103

104104
protected def buildJobDep(jobID: Int, stage: Stage) {
105-
if (stage.priority == jobID) {
105+
if (stage.jobId == jobID) {
106106
jobIDToStages.get(jobID) match {
107107
case Some(stageList) => stageList += stage
108108
case None => val stageList = new ListBuffer[Stage]
@@ -178,12 +178,12 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
178178
}else{
179179
stageInfo = "STAGE_ID=" + stage.id + " RESULT_STAGE"
180180
}
181-
if (stage.priority == jobID) {
181+
if (stage.jobId == jobID) {
182182
jobLogInfo(jobID, indentString(indent) + stageInfo, false)
183183
recordRddInStageGraph(jobID, stage.rdd, indent)
184184
stage.parents.foreach(recordStageDepGraph(jobID, _, indent + 2))
185185
} else
186-
jobLogInfo(jobID, indentString(indent) + stageInfo + " JOB_ID=" + stage.priority, false)
186+
jobLogInfo(jobID, indentString(indent) + stageInfo + " JOB_ID=" + stage.jobId, false)
187187
}
188188

189189
// Record task metrics into job log files
@@ -260,16 +260,16 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
260260

261261
override def onJobEnd(jobEnd: SparkListenerJobEnd) {
262262
val job = jobEnd.job
263-
var info = "JOB_ID=" + job.runId
263+
var info = "JOB_ID=" + job.jobId
264264
jobEnd.jobResult match {
265265
case JobSucceeded => info += " STATUS=SUCCESS"
266266
case JobFailed(exception, _) =>
267267
info += " STATUS=FAILED REASON="
268268
exception.getMessage.split("\\s+").foreach(info += _ + "_")
269269
case _ =>
270270
}
271-
jobLogInfo(job.runId, info.substring(0, info.length - 1).toUpperCase)
272-
closeLogWriter(job.runId)
271+
jobLogInfo(job.jobId, info.substring(0, info.length - 1).toUpperCase)
272+
closeLogWriter(job.jobId)
273273
}
274274

275275
protected def recordJobProperties(jobID: Int, properties: Properties) {
@@ -282,11 +282,11 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
282282
override def onJobStart(jobStart: SparkListenerJobStart) {
283283
val job = jobStart.job
284284
val properties = jobStart.properties
285-
createLogWriter(job.runId)
286-
recordJobProperties(job.runId, properties)
287-
buildJobDep(job.runId, job.finalStage)
288-
recordStageDep(job.runId)
289-
recordStageDepGraph(job.runId, job.finalStage)
290-
jobLogInfo(job.runId, "JOB_ID=" + job.runId + " STATUS=STARTED")
285+
createLogWriter(job.jobId)
286+
recordJobProperties(job.jobId, properties)
287+
buildJobDep(job.jobId, job.finalStage)
288+
recordStageDep(job.jobId)
289+
recordStageDepGraph(job.jobId, job.finalStage)
290+
jobLogInfo(job.jobId, "JOB_ID=" + job.jobId + " STATUS=STARTED")
291291
}
292292
}

core/src/main/scala/spark/scheduler/Stage.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,16 @@ import spark.storage.BlockManagerId
3333
* initiated a job (e.g. count(), save(), etc). For shuffle map stages, we also track the nodes
3434
* that each output partition is on.
3535
*
36-
* Each Stage also has a priority, which is (by default) based on the job it was submitted in.
37-
* This allows Stages from earlier jobs to be computed first or recovered faster on failure.
36+
* Each Stage also has a jobId, identifying the job that first submitted the stage. When FIFO
37+
* scheduling is used, this allows Stages from earlier jobs to be computed first or recovered
38+
* faster on failure.
3839
*/
3940
private[spark] class Stage(
4041
val id: Int,
4142
val rdd: RDD[_],
4243
val shuffleDep: Option[ShuffleDependency[_,_]], // Output shuffle if stage is a map stage
4344
val parents: List[Stage],
44-
val priority: Int,
45+
val jobId: Int,
4546
callSite: Option[String])
4647
extends Logging {
4748

0 commit comments

Comments
 (0)