Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 4 additions & 8 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -292,13 +292,11 @@ class SparkContext(config: SparkConf) extends Logging {
private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master)
private val heartbeatReceiver = env.actorSystem.actorOf(
Props(new HeartbeatReceiver(taskScheduler)), "HeartbeatReceiver")

@volatile private[spark] var dagScheduler: DAGScheduler = _
try {
dagScheduler = new DAGScheduler(this)
} catch {
case e: Exception => throw
new SparkException("DAGScheduler cannot be initialized due to %s".format(e.getMessage))
}

// for work around the current MIMA issue
dagScheduler = new DAGScheduler(this)

// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
// constructor
Expand Down Expand Up @@ -1035,8 +1033,6 @@ class SparkContext(config: SparkConf) extends Logging {
listenerBus.stop()
eventLogger.foreach(_.stop())
logInfo("Successfully stopped SparkContext")
} else {
logInfo("SparkContext already stopped")
}
}

Expand Down
32 changes: 18 additions & 14 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,12 @@ class DAGScheduler(

// Called by TaskScheduler to report task's starting.
def taskStarted(task: Task[_], taskInfo: TaskInfo) {
eventProcessActor ! BeginEvent(task, taskInfo)
dagSchedulerActorSupervisor ! BeginEvent(task, taskInfo)
}

// Called to report that a task has completed and results are being fetched remotely.
def taskGettingResult(taskInfo: TaskInfo) {
eventProcessActor ! GettingResultEvent(taskInfo)
dagSchedulerActorSupervisor ! GettingResultEvent(taskInfo)
}

// Called by TaskScheduler to report task completions or failures.
Expand All @@ -154,7 +154,8 @@ class DAGScheduler(
accumUpdates: Map[Long, Any],
taskInfo: TaskInfo,
taskMetrics: TaskMetrics) {
eventProcessActor ! CompletionEvent(task, reason, result, accumUpdates, taskInfo, taskMetrics)
dagSchedulerActorSupervisor !
CompletionEvent(task, reason, result, accumUpdates, taskInfo, taskMetrics)
}

/**
Expand All @@ -176,18 +177,18 @@ class DAGScheduler(

// Called by TaskScheduler when an executor fails.
def executorLost(execId: String) {
eventProcessActor ! ExecutorLost(execId)
dagSchedulerActorSupervisor ! ExecutorLost(execId)
}

// Called by TaskScheduler when a host is added
def executorAdded(execId: String, host: String) {
eventProcessActor ! ExecutorAdded(execId, host)
dagSchedulerActorSupervisor ! ExecutorAdded(execId, host)
}

// Called by TaskScheduler to cancel an entire TaskSet due to either repeated failures or
// cancellation of the job itself.
def taskSetFailed(taskSet: TaskSet, reason: String) {
eventProcessActor ! TaskSetFailed(taskSet, reason)
dagSchedulerActorSupervisor ! TaskSetFailed(taskSet, reason)
}

private def getCacheLocs(rdd: RDD[_]): Array[Seq[TaskLocation]] = {
Expand Down Expand Up @@ -493,7 +494,7 @@ class DAGScheduler(
assert(partitions.size > 0)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
eventProcessActor ! JobSubmitted(
dagSchedulerActorSupervisor ! JobSubmitted(
jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties)
waiter
}
Expand Down Expand Up @@ -534,7 +535,7 @@ class DAGScheduler(
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val partitions = (0 until rdd.partitions.size).toArray
val jobId = nextJobId.getAndIncrement()
eventProcessActor ! JobSubmitted(
dagSchedulerActorSupervisor ! JobSubmitted(
jobId, rdd, func2, partitions, allowLocal = false, callSite, listener, properties)
listener.awaitResult() // Will throw an exception if the job fails
}
Expand All @@ -544,19 +545,19 @@ class DAGScheduler(
*/
def cancelJob(jobId: Int) {
logInfo("Asked to cancel job " + jobId)
eventProcessActor ! JobCancelled(jobId)
dagSchedulerActorSupervisor ! JobCancelled(jobId)
}

def cancelJobGroup(groupId: String) {
logInfo("Asked to cancel job group " + groupId)
eventProcessActor ! JobGroupCancelled(groupId)
dagSchedulerActorSupervisor ! JobGroupCancelled(groupId)
}

/**
* Cancel all jobs that are running or waiting in the queue.
*/
def cancelAllJobs() {
eventProcessActor ! AllJobsCancelled
dagSchedulerActorSupervisor ! AllJobsCancelled
}

private[scheduler] def doCancelAllJobs() {
Expand All @@ -572,7 +573,7 @@ class DAGScheduler(
* Cancel all jobs associated with a running or scheduled stage.
*/
def cancelStage(stageId: Int) {
eventProcessActor ! StageCancelled(stageId)
dagSchedulerActorSupervisor ! StageCancelled(stageId)
}

/**
Expand Down Expand Up @@ -1063,7 +1064,6 @@ class DAGScheduler(
markStageAsFinished(failedStage, Some("Fetch failure"))
runningStages -= failedStage
}

if (failedStages.isEmpty && eventProcessActor != null) {
// Don't schedule an event to resubmit failed stages if failed isn't empty, because
// in that case the event will already have been scheduled. eventProcessActor may be
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment needs update

Expand All @@ -1073,7 +1073,7 @@ class DAGScheduler(
logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " +
s"$failedStage (${failedStage.name}) due to fetch failure")
env.actorSystem.scheduler.scheduleOnce(
RESUBMIT_TIMEOUT, eventProcessActor, ResubmitFailedStages)
RESUBMIT_TIMEOUT, dagSchedulerActorSupervisor, ResubmitFailedStages)
}
failedStages += failedStage
failedStages += mapStage
Expand Down Expand Up @@ -1330,6 +1330,9 @@ class DAGScheduler(
private[scheduler] class DAGSchedulerActorSupervisor(dagScheduler: DAGScheduler)
extends Actor with Logging {

private val eventProcessActor: ActorRef =
context.actorOf(Props(new DAGSchedulerEventProcessActor(dagScheduler)))

override val supervisorStrategy =
OneForOneStrategy() {
case x: Exception =>
Expand All @@ -1345,6 +1348,7 @@ private[scheduler] class DAGSchedulerActorSupervisor(dagScheduler: DAGScheduler)

def receive = {
case p: Props => sender ! context.actorOf(p)
case msg: DAGSchedulerEvent => eventProcessActor ! msg
case _ => logWarning("received unknown message in DAGSchedulerActorSupervisor")
}
}
Expand Down