Skip to content

Commit 0b448df

Browse files
kayousterhoutpwendell
authored andcommitted
Merge pull request apache#450 from kayousterhout/fetch_failures. Closes apache#450.
Only run ResubmitFailedStages event after a fetch fails Previously, the ResubmitFailedStages event was called every 200 milliseconds, leading to a lot of unnecessary event processing and clogged DAGScheduler logs. Author: Kay Ousterhout <[email protected]> == Merge branch commits == commit e603784b3a562980e6f1863845097effe2129d3b Author: Kay Ousterhout <[email protected]> Date: Wed Feb 5 11:34:41 2014 -0800 Re-add check for empty set of failed stages commit d258f0ef50caff4bbb19fb95a6b82186db1935bf Author: Kay Ousterhout <[email protected]> Date: Wed Jan 15 23:35:41 2014 -0800 Only run ResubmitFailedStages event after a fetch fails Previously, the ResubmitFailedStages event was called every 200 milliseconds, leading to a lot of unnecessary event processing and clogged DAGScheduler logs.
1 parent 18ad59e commit 0b448df

File tree

1 file changed

+11
-22
lines changed

1 file changed

+11
-22
lines changed

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 11 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,6 @@ class DAGScheduler(
155155
val failed = new HashSet[Stage] // Stages that must be resubmitted due to fetch failures
156156
// Missing tasks from each stage
157157
val pendingTasks = new TimeStampedHashMap[Stage, HashSet[Task[_]]]
158-
var lastFetchFailureTime: Long = 0 // Used to wait a bit to avoid repeated resubmits
159158

160159
val activeJobs = new HashSet[ActiveJob]
161160
val resultStageToJob = new HashMap[Stage, ActiveJob]
@@ -176,22 +175,6 @@ class DAGScheduler(
176175
*/
177176
def start() {
178177
eventProcessActor = env.actorSystem.actorOf(Props(new Actor {
179-
/**
180-
* A handle to the periodical task, used to cancel the task when the actor is stopped.
181-
*/
182-
var resubmissionTask: Cancellable = _
183-
184-
override def preStart() {
185-
import context.dispatcher
186-
/**
187-
* A message is sent to the actor itself periodically to remind the actor to resubmit failed
188-
* stages. In this way, stage resubmission can be done within the same thread context of
189-
* other event processing logic to avoid unnecessary synchronization overhead.
190-
*/
191-
resubmissionTask = context.system.scheduler.schedule(
192-
RESUBMIT_TIMEOUT, RESUBMIT_TIMEOUT, self, ResubmitFailedStages)
193-
}
194-
195178
/**
196179
* The main event loop of the DAG scheduler.
197180
*/
@@ -207,7 +190,6 @@ class DAGScheduler(
207190
if (!processEvent(event)) {
208191
submitWaitingStages()
209192
} else {
210-
resubmissionTask.cancel()
211193
context.stop(self)
212194
}
213195
}
@@ -620,6 +602,8 @@ class DAGScheduler(
620602

621603
case ResubmitFailedStages =>
622604
if (failed.size > 0) {
605+
// Failed stages may be removed by job cancellation, so failed might be empty even if
606+
// the ResubmitFailedStages event has been scheduled.
623607
resubmitFailedStages()
624608
}
625609

@@ -926,7 +910,6 @@ class DAGScheduler(
926910
// Mark the stage that the reducer was in as unrunnable
927911
val failedStage = stageIdToStage(task.stageId)
928912
running -= failedStage
929-
failed += failedStage
930913
// TODO: Cancel running tasks in the stage
931914
logInfo("Marking " + failedStage + " (" + failedStage.name +
932915
") for resubmision due to a fetch failure")
@@ -938,10 +921,16 @@ class DAGScheduler(
938921
}
939922
logInfo("The failed fetch was from " + mapStage + " (" + mapStage.name +
940923
"); marking it for resubmission")
924+
if (failed.isEmpty && eventProcessActor != null) {
925+
// Don't schedule an event to resubmit failed stages if failed isn't empty, because
926+
// in that case the event will already have been scheduled. eventProcessActor may be
927+
// null during unit tests.
928+
import env.actorSystem.dispatcher
929+
env.actorSystem.scheduler.scheduleOnce(
930+
RESUBMIT_TIMEOUT, eventProcessActor, ResubmitFailedStages)
931+
}
932+
failed += failedStage
941933
failed += mapStage
942-
// Remember that a fetch failed now; this is used to resubmit the broken
943-
// stages later, after a small wait (to give other tasks the chance to fail)
944-
lastFetchFailureTime = System.currentTimeMillis() // TODO: Use pluggable clock
945934
// TODO: mark the executor as failed only if there were lots of fetch failures on it
946935
if (bmAddress != null) {
947936
handleExecutorLost(bmAddress.executorId, Some(task.epoch))

0 commit comments

Comments
 (0)