Skip to content

Commit dabbefe

Browse files
allenmaGitHub Enterprise
authored andcommitted
[CARMEL-6086] DAGScheduler exit because of jobId not exist (#1008)
1 parent 396aa51 commit dabbefe

File tree

2 files changed

+11
-7
lines changed

2 files changed

+11
-7
lines changed

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -722,6 +722,7 @@ private[spark] class DAGScheduler(
722722
* @param job The job whose state to cleanup.
723723
*/
724724
private def cleanupStateForJobAndIndependentStages(job: ActiveJob): Unit = {
725+
logInfo(s"Clean up job:${job.jobId} state")
725726
val registeredStages = jobIdToStageIds.get(job.jobId)
726727
if (registeredStages.isEmpty || registeredStages.get.isEmpty) {
727728
logError("No stages registered for job " + job.jobId)
@@ -1452,19 +1453,21 @@ private[spark] class DAGScheduler(
14521453
eventProcessLoop.post(SubmitMissingTask(
14531454
stage,
14541455
jobId,
1456+
properties,
14551457
partitionsToCompute,
14561458
taskIdToLocations,
14571459
taskBinary,
14581460
partitions))
14591461
} else {
14601462
handleSubmitMissingTask(
14611463
SubmitMissingTask(
1462-
stage,
1463-
jobId,
1464-
partitionsToCompute,
1465-
taskIdToLocations,
1466-
taskBinary,
1467-
partitions))
1464+
stage,
1465+
jobId,
1466+
properties,
1467+
partitionsToCompute,
1468+
taskIdToLocations,
1469+
taskBinary,
1470+
partitions))
14681471
}
14691472
}
14701473
}
@@ -1498,7 +1501,7 @@ private[spark] class DAGScheduler(
14981501
}
14991502
// Use the scheduling pool, job group, description, etc. from an ActiveJob associated
15001503
// with this Stage
1501-
val properties = jobIdToActiveJob(missingTask.jobId).properties
1504+
val properties = missingTask.properties
15021505

15031506
val tasks: Seq[Task[_]] = try {
15041507
val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()

core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ case class SpeculativeTaskSubmitted(task: Task[_]) extends DAGSchedulerEvent
106106
private[scheduler] case class SubmitMissingTask(
107107
stage: Stage,
108108
jobId: Int,
109+
properties: Properties,
109110
partitionsToCompute: Seq[Int],
110111
taskIdToLocations: scala.collection.Map[Int, Seq[TaskLocation]],
111112
taskBinary: Broadcast[Array[Byte]],

0 commit comments

Comments
 (0)