diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 4565832334420..de4c70ebba1c4 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -292,13 +292,11 @@ class SparkContext(config: SparkConf) extends Logging { private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master) private val heartbeatReceiver = env.actorSystem.actorOf( Props(new HeartbeatReceiver(taskScheduler)), "HeartbeatReceiver") + @volatile private[spark] var dagScheduler: DAGScheduler = _ - try { - dagScheduler = new DAGScheduler(this) - } catch { - case e: Exception => throw - new SparkException("DAGScheduler cannot be initialized due to %s".format(e.getMessage)) - } + + // for work around the current MIMA issue + dagScheduler = new DAGScheduler(this) // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's // constructor @@ -1035,8 +1033,6 @@ class SparkContext(config: SparkConf) extends Logging { listenerBus.stop() eventLogger.foreach(_.stop()) logInfo("Successfully stopped SparkContext") - } else { - logInfo("SparkContext already stopped") } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index f81fa6d8089fc..8213c0524de54 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -138,12 +138,12 @@ class DAGScheduler( // Called by TaskScheduler to report task's starting. def taskStarted(task: Task[_], taskInfo: TaskInfo) { - eventProcessActor ! BeginEvent(task, taskInfo) + dagSchedulerActorSupervisor ! BeginEvent(task, taskInfo) } // Called to report that a task has completed and results are being fetched remotely. def taskGettingResult(taskInfo: TaskInfo) { - eventProcessActor ! GettingResultEvent(taskInfo) + dagSchedulerActorSupervisor ! GettingResultEvent(taskInfo) } // Called by TaskScheduler to report task completions or failures. @@ -154,7 +154,8 @@ class DAGScheduler( accumUpdates: Map[Long, Any], taskInfo: TaskInfo, taskMetrics: TaskMetrics) { - eventProcessActor ! CompletionEvent(task, reason, result, accumUpdates, taskInfo, taskMetrics) + dagSchedulerActorSupervisor ! + CompletionEvent(task, reason, result, accumUpdates, taskInfo, taskMetrics) } /** @@ -176,18 +177,18 @@ class DAGScheduler( // Called by TaskScheduler when an executor fails. def executorLost(execId: String) { - eventProcessActor ! ExecutorLost(execId) + dagSchedulerActorSupervisor ! ExecutorLost(execId) } // Called by TaskScheduler when a host is added def executorAdded(execId: String, host: String) { - eventProcessActor ! ExecutorAdded(execId, host) + dagSchedulerActorSupervisor ! ExecutorAdded(execId, host) } // Called by TaskScheduler to cancel an entire TaskSet due to either repeated failures or // cancellation of the job itself. def taskSetFailed(taskSet: TaskSet, reason: String) { - eventProcessActor ! TaskSetFailed(taskSet, reason) + dagSchedulerActorSupervisor ! TaskSetFailed(taskSet, reason) } private def getCacheLocs(rdd: RDD[_]): Array[Seq[TaskLocation]] = { @@ -493,7 +494,7 @@ class DAGScheduler( assert(partitions.size > 0) val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler) - eventProcessActor ! JobSubmitted( + dagSchedulerActorSupervisor ! JobSubmitted( jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties) waiter } @@ -534,7 +535,7 @@ class DAGScheduler( val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] val partitions = (0 until rdd.partitions.size).toArray val jobId = nextJobId.getAndIncrement() - eventProcessActor ! JobSubmitted( + dagSchedulerActorSupervisor ! JobSubmitted( jobId, rdd, func2, partitions, allowLocal = false, callSite, listener, properties) listener.awaitResult() // Will throw an exception if the job fails } @@ -544,19 +545,19 @@ class DAGScheduler( */ def cancelJob(jobId: Int) { logInfo("Asked to cancel job " + jobId) - eventProcessActor ! JobCancelled(jobId) + dagSchedulerActorSupervisor ! JobCancelled(jobId) } def cancelJobGroup(groupId: String) { logInfo("Asked to cancel job group " + groupId) - eventProcessActor ! JobGroupCancelled(groupId) + dagSchedulerActorSupervisor ! JobGroupCancelled(groupId) } /** * Cancel all jobs that are running or waiting in the queue. */ def cancelAllJobs() { - eventProcessActor ! AllJobsCancelled + dagSchedulerActorSupervisor ! AllJobsCancelled } private[scheduler] def doCancelAllJobs() { @@ -572,7 +573,7 @@ class DAGScheduler( * Cancel all jobs associated with a running or scheduled stage. */ def cancelStage(stageId: Int) { - eventProcessActor ! StageCancelled(stageId) + dagSchedulerActorSupervisor ! StageCancelled(stageId) } /** @@ -1063,7 +1064,6 @@ class DAGScheduler( markStageAsFinished(failedStage, Some("Fetch failure")) runningStages -= failedStage } - if (failedStages.isEmpty && eventProcessActor != null) { // Don't schedule an event to resubmit failed stages if failed isn't empty, because // in that case the event will already have been scheduled. eventProcessActor may be @@ -1073,7 +1073,7 @@ class DAGScheduler( logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " + s"$failedStage (${failedStage.name}) due to fetch failure") env.actorSystem.scheduler.scheduleOnce( - RESUBMIT_TIMEOUT, eventProcessActor, ResubmitFailedStages) + RESUBMIT_TIMEOUT, dagSchedulerActorSupervisor, ResubmitFailedStages) } failedStages += failedStage failedStages += mapStage @@ -1330,6 +1330,9 @@ class DAGScheduler( private[scheduler] class DAGSchedulerActorSupervisor(dagScheduler: DAGScheduler) extends Actor with Logging { + private val eventProcessActor: ActorRef = + context.actorOf(Props(new DAGSchedulerEventProcessActor(dagScheduler))) + override val supervisorStrategy = OneForOneStrategy() { case x: Exception => @@ -1345,6 +1348,7 @@ private[scheduler] class DAGSchedulerActorSupervisor(dagScheduler: DAGScheduler) def receive = { case p: Props => sender ! context.actorOf(p) + case msg: DAGSchedulerEvent => eventProcessActor ! msg case _ => logWarning("received unknown message in DAGSchedulerActorSupervisor") } }