From 810efd809b589bc9d9dd2d97a71f656ef2c7d6ec Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Wed, 9 Apr 2014 02:21:15 -0700 Subject: [PATCH 01/19] akka solution --- .../scala/org/apache/spark/SparkContext.scala | 9 ++- .../apache/spark/scheduler/DAGScheduler.scala | 66 +++++++++++++------ .../spark/scheduler/DAGSchedulerSuite.scala | 1 + 3 files changed, 53 insertions(+), 23 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 3ddc0d5eeefb8..7bc7a95929c8d 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -274,10 +274,10 @@ class SparkContext(config: SparkConf) extends Logging { // Create and start the scheduler private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master) - taskScheduler.start() - @volatile private[spark] var dagScheduler = new DAGScheduler(this) - dagScheduler.start() + // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's + // constructor + taskScheduler.start() private[spark] val cleaner: Option[ContextCleaner] = { if (conf.getBoolean("spark.cleaner.referenceTracking", true)) { @@ -1007,6 +1007,9 @@ class SparkContext(config: SparkConf) extends Logging { partitions: Seq[Int], allowLocal: Boolean, resultHandler: (Int, U) => Unit) { + if (dagScheduler == null) { + throw new SparkException("SparkContext has been shutdown") + } partitions.foreach{ p => require(p >= 0 && p < rdd.partitions.size, s"Invalid partition requested: $p") } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index c6cbf14e20069..41617aefc87f1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -26,13 +26,14 @@ import scala.concurrent.duration._ import scala.reflect.ClassTag import akka.actor._ +import akka.actor.SupervisorStrategy.Stop import org.apache.spark._ import org.apache.spark.executor.TaskMetrics import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult} import org.apache.spark.rdd.RDD import org.apache.spark.storage.{BlockId, BlockManager, BlockManagerMaster, RDDBlockId} -import org.apache.spark.util.Utils +import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, Utils} /** * The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of @@ -54,6 +55,7 @@ import org.apache.spark.util.Utils */ private[spark] class DAGScheduler( + sc: SparkContext, taskScheduler: TaskScheduler, listenerBus: LiveListenerBus, mapOutputTracker: MapOutputTrackerMaster, @@ -65,6 +67,7 @@ class DAGScheduler( def this(sc: SparkContext, taskScheduler: TaskScheduler) = { this( + sc, taskScheduler, sc.listenerBus, sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], @@ -116,18 +119,36 @@ class DAGScheduler( taskScheduler.setDAGScheduler(this) /** - * Starts the event processing actor. The actor has two responsibilities: - * - * 1. Waits for events like job submission, task finished, task failure etc., and calls - * [[org.apache.spark.scheduler.DAGScheduler.processEvent()]] to process them. - * 2. Schedules a periodical task to resubmit failed stages. - * - * NOTE: the actor cannot be started in the constructor, because the periodical task references - * some internal states of the enclosing [[org.apache.spark.scheduler.DAGScheduler]] object, thus - * cannot be scheduled until the [[org.apache.spark.scheduler.DAGScheduler]] is fully constructed. + * Starts the event processing actor within the supervisor. The eventProcessingActor + * waits for events like job submission, task finished, task failure etc., and calls + * [[org.apache.spark.scheduler.DAGScheduler.processEvent()]] to process them. */ - def start() { - eventProcessActor = env.actorSystem.actorOf(Props(new Actor { + env.actorSystem.actorOf(Props(new Actor { + + override val supervisorStrategy = + OneForOneStrategy() { + case x: Exception => { + logError("eventProcesserActor failed due to the error %s; shutting down SparkContext" + .format(x.getMessage)) + doCancelAllJobs() + sc.stop() + Stop + } + } + + // do nothing in supervisor + def receive = { + case _ => + } + + eventProcessActor = context.actorOf(Props(new Actor { + + override def preStart() { + // set DAGScheduler for taskScheduler to ensure eventProcessActor is always + // valid when the messages arrive + taskScheduler.setDAGScheduler(DAGScheduler.this) + } + /** * The main event loop of the DAG scheduler. */ @@ -137,8 +158,8 @@ class DAGScheduler( /** * All events are forwarded to `processEvent()`, so that the event processing logic can - * easily tested without starting a dedicated actor. Please refer to `DAGSchedulerSuite` - * for details. + * be easily tested without starting a dedicated actor. Please refer to + * `DAGSchedulerSuite` for details. */ if (!processEvent(event)) { submitWaitingStages() @@ -147,7 +168,7 @@ class DAGScheduler( } } })) - } + })) // Called by TaskScheduler to report task's starting. def taskStarted(task: Task[_], taskInfo: TaskInfo) { @@ -511,6 +532,14 @@ class DAGScheduler( eventProcessActor ! AllJobsCancelled } + private def doCancelAllJobs() { + // Cancel all running jobs. + runningStages.map(_.jobId).foreach(handleJobCancellation(_, + reason = "as part of cancellation of all jobs")) + activeJobs.clear() // These should already be empty by this point, + jobIdToActiveJob.clear() // but just in case we lost track of some jobs... + } + /** * Cancel all jobs associated with a running or scheduled stage. */ @@ -575,10 +604,7 @@ class DAGScheduler( case AllJobsCancelled => // Cancel all running jobs. - runningStages.map(_.jobId).foreach(jobId => handleJobCancellation(jobId, - "as part of cancellation of all jobs")) - activeJobs.clear() // These should already be empty by this point, - jobIdToActiveJob.clear() // but just in case we lost track of some jobs... + doCancelAllJobs() case ExecutorAdded(execId, host) => handleExecutorAdded(execId, host) @@ -821,7 +847,6 @@ class DAGScheduler( */ private def handleTaskCompletion(event: CompletionEvent) { val task = event.task - if (!stageIdToStage.contains(task.stageId)) { // Skip all the actions if the stage has been cancelled. return @@ -1152,6 +1177,7 @@ class DAGScheduler( } def stop() { + logInfo("Stopping DAGScheduler") if (eventProcessActor != null) { eventProcessActor ! StopDAGScheduler } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 35a7ac9d049c2..b53ccad40eb4f 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -121,6 +121,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont results.clear() mapOutputTracker = new MapOutputTrackerMaster(conf) scheduler = new DAGScheduler( + sc, taskScheduler, sc.listenerBus, mapOutputTracker, From b68c1c732b00005591a132157987545f828174a0 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Fri, 4 Apr 2014 19:10:18 -0400 Subject: [PATCH 02/19] refactor DAGScheduler with Akka --- .../scala/org/apache/spark/SparkContext.scala | 7 - .../apache/spark/scheduler/DAGScheduler.scala | 367 ++++++++---------- .../spark/scheduler/DAGSchedulerEvent.scala | 2 - .../spark/scheduler/DAGSchedulerSuite.scala | 60 ++- .../scheduler/TaskSchedulerImplSuite.scala | 1 - 5 files changed, 211 insertions(+), 226 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 7bc7a95929c8d..253de6ef95b14 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1010,9 +1010,6 @@ class SparkContext(config: SparkConf) extends Logging { if (dagScheduler == null) { throw new SparkException("SparkContext has been shutdown") } - partitions.foreach{ p => - require(p >= 0 && p < rdd.partitions.size, s"Invalid partition requested: $p") - } val callSite = getCallSite val cleanedFunc = clean(func) logInfo("Starting job: " + callSite) @@ -1020,7 +1017,6 @@ class SparkContext(config: SparkConf) extends Logging { dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal, resultHandler, localProperties.get) logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s") - rdd.doCheckpoint() } /** @@ -1119,9 +1115,6 @@ class SparkContext(config: SparkConf) extends Logging { resultHandler: (Int, U) => Unit, resultFunc: => R): SimpleFutureAction[R] = { - partitions.foreach{ p => - require(p >= 0 && p < rdd.partitions.size, s"Invalid partition requested: $p") - } val cleanF = clean(processPartition) val callSite = getCallSite val waiter = dagScheduler.submitJob( diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 41617aefc87f1..2426d321c373d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -27,13 +27,14 @@ import scala.reflect.ClassTag import akka.actor._ import akka.actor.SupervisorStrategy.Stop +import akka.actor.OneForOneStrategy import org.apache.spark._ import org.apache.spark.executor.TaskMetrics import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult} import org.apache.spark.rdd.RDD import org.apache.spark.storage.{BlockId, BlockManager, BlockManagerMaster, RDDBlockId} -import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, Utils} +import org.apache.spark.util.Utils /** * The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of @@ -48,19 +49,15 @@ import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, Utils} * not caused by shuffle file loss are handled by the TaskScheduler, which will retry each task * a small number of times before cancelling the whole stage. * - * THREADING: This class runs all its logic in a single thread executing the run() method, to which - * events are submitted using a synchronized queue (eventQueue). The public API methods, such as - * runJob, taskEnded and executorLost, post events asynchronously to this queue. All other methods - * should be private. */ private[spark] class DAGScheduler( - sc: SparkContext, - taskScheduler: TaskScheduler, - listenerBus: LiveListenerBus, + private[scheduler] val sc: SparkContext, + private[scheduler] val taskScheduler: TaskScheduler, + private[scheduler] val listenerBus: LiveListenerBus, mapOutputTracker: MapOutputTrackerMaster, blockManagerMaster: BlockManagerMaster, - env: SparkEnv) + private[scheduler] val env: SparkEnv) extends Logging { import DAGScheduler._ @@ -77,7 +74,7 @@ class DAGScheduler( def this(sc: SparkContext) = this(sc, sc.taskScheduler) - private var eventProcessActor: ActorRef = _ + private[scheduler] var eventProcessActor: ActorRef = _ private[scheduler] val nextJobId = new AtomicInteger(0) private[scheduler] def numTotalJobs: Int = nextJobId.get() @@ -118,57 +115,8 @@ class DAGScheduler( taskScheduler.setDAGScheduler(this) - /** - * Starts the event processing actor within the supervisor. The eventProcessingActor - * waits for events like job submission, task finished, task failure etc., and calls - * [[org.apache.spark.scheduler.DAGScheduler.processEvent()]] to process them. - */ - env.actorSystem.actorOf(Props(new Actor { - - override val supervisorStrategy = - OneForOneStrategy() { - case x: Exception => { - logError("eventProcesserActor failed due to the error %s; shutting down SparkContext" - .format(x.getMessage)) - doCancelAllJobs() - sc.stop() - Stop - } - } - - // do nothing in supervisor - def receive = { - case _ => - } - - eventProcessActor = context.actorOf(Props(new Actor { - - override def preStart() { - // set DAGScheduler for taskScheduler to ensure eventProcessActor is always - // valid when the messages arrive - taskScheduler.setDAGScheduler(DAGScheduler.this) - } - - /** - * The main event loop of the DAG scheduler. - */ - def receive = { - case event: DAGSchedulerEvent => - logTrace("Got event of type " + event.getClass.getName) - - /** - * All events are forwarded to `processEvent()`, so that the event processing logic can - * be easily tested without starting a dedicated actor. Please refer to - * `DAGSchedulerSuite` for details. - */ - if (!processEvent(event)) { - submitWaitingStages() - } else { - context.stop(self) - } - } - })) - })) + private val dagSchedulerActorSupervisor = + env.actorSystem.actorOf(Props(new DAGSchedulerActorSupervisor(this))) // Called by TaskScheduler to report task's starting. def taskStarted(task: Task[_], taskInfo: TaskInfo) { @@ -218,7 +166,7 @@ class DAGScheduler( cacheLocs(rdd.id) } - private def clearCacheLocs() { + private[scheduler] def clearCacheLocs() { cacheLocs.clear() } @@ -244,7 +192,7 @@ class DAGScheduler( * jobId. Production of shuffle map stages should always use newOrUsedStage, not newStage * directly. */ - private def newStage( + private[scheduler] def newStage( rdd: RDD[_], numTasks: Int, shuffleDep: Option[ShuffleDependency[_,_]], @@ -318,7 +266,7 @@ class DAGScheduler( parents.toList } - private def getMissingParentStages(stage: Stage): List[Stage] = { + private[scheduler] def getMissingParentStages(stage: Stage): List[Stage] = { val missing = new HashSet[Stage] val visited = new HashSet[RDD[_]] def visit(rdd: RDD[_]) { @@ -457,7 +405,7 @@ class DAGScheduler( { // Check to make sure we are not launching a task on a partition that does not exist. val maxPartitions = rdd.partitions.length - partitions.find(p => p >= maxPartitions).foreach { p => + partitions.find(p => p >= maxPartitions || p < 0).foreach { p => throw new IllegalArgumentException( "Attempting to access a non-existent partition: " + p + ". " + "Total number of partitions: " + maxPartitions) @@ -487,7 +435,7 @@ class DAGScheduler( { val waiter = submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties) waiter.awaitResult() match { - case JobSucceeded => {} + case JobSucceeded => rdd.doCheckpoint() case JobFailed(exception: Exception) => logInfo("Failed to run " + callSite) throw exception @@ -532,7 +480,7 @@ class DAGScheduler( eventProcessActor ! AllJobsCancelled } - private def doCancelAllJobs() { + private[scheduler] def doCancelAllJobs() { // Cancel all running jobs. runningStages.map(_.jobId).foreach(handleJobCancellation(_, reason = "as part of cancellation of all jobs")) @@ -547,126 +495,6 @@ class DAGScheduler( eventProcessActor ! StageCancelled(stageId) } - /** - * Process one event retrieved from the event processing actor. - * - * @param event The event to be processed. - * @return `true` if we should stop the event loop. - */ - private[scheduler] def processEvent(event: DAGSchedulerEvent): Boolean = { - event match { - case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) => - var finalStage: Stage = null - try { - // New stage creation may throw an exception if, for example, jobs are run on a HadoopRDD - // whose underlying HDFS files have been deleted. - finalStage = newStage(rdd, partitions.size, None, jobId, Some(callSite)) - } catch { - case e: Exception => - logWarning("Creating new stage failed due to exception - job: " + jobId, e) - listener.jobFailed(e) - return false - } - val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties) - clearCacheLocs() - logInfo("Got job " + job.jobId + " (" + callSite + ") with " + partitions.length + - " output partitions (allowLocal=" + allowLocal + ")") - logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")") - logInfo("Parents of final stage: " + finalStage.parents) - logInfo("Missing parents: " + getMissingParentStages(finalStage)) - if (allowLocal && finalStage.parents.size == 0 && partitions.length == 1) { - // Compute very short actions like first() or take() with no parent stages locally. - listenerBus.post(SparkListenerJobStart(job.jobId, Array[Int](), properties)) - runLocally(job) - } else { - jobIdToActiveJob(jobId) = job - activeJobs += job - resultStageToJob(finalStage) = job - listenerBus.post( - SparkListenerJobStart(job.jobId, jobIdToStageIds(jobId).toArray, properties)) - submitStage(finalStage) - } - - case StageCancelled(stageId) => - handleStageCancellation(stageId) - - case JobCancelled(jobId) => - handleJobCancellation(jobId) - - case JobGroupCancelled(groupId) => - // Cancel all jobs belonging to this job group. - // First finds all active jobs with this group id, and then kill stages for them. - val activeInGroup = activeJobs.filter(activeJob => - groupId == activeJob.properties.get(SparkContext.SPARK_JOB_GROUP_ID)) - val jobIds = activeInGroup.map(_.jobId) - jobIds.foreach(jobId => handleJobCancellation(jobId, - "as part of cancelled job group %s".format(groupId))) - - case AllJobsCancelled => - // Cancel all running jobs. - doCancelAllJobs() - - case ExecutorAdded(execId, host) => - handleExecutorAdded(execId, host) - - case ExecutorLost(execId) => - handleExecutorLost(execId) - - case BeginEvent(task, taskInfo) => - for ( - stage <- stageIdToStage.get(task.stageId); - stageInfo <- stageToInfos.get(stage) - ) { - if (taskInfo.serializedSize > TASK_SIZE_TO_WARN * 1024 && - !stageInfo.emittedTaskSizeWarning) { - stageInfo.emittedTaskSizeWarning = true - logWarning(("Stage %d (%s) contains a task of very large " + - "size (%d KB). The maximum recommended task size is %d KB.").format( - task.stageId, stageInfo.name, taskInfo.serializedSize / 1024, TASK_SIZE_TO_WARN)) - } - } - listenerBus.post(SparkListenerTaskStart(task.stageId, taskInfo)) - - case GettingResultEvent(task, taskInfo) => - listenerBus.post(SparkListenerTaskGettingResult(taskInfo)) - - case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) => - val stageId = task.stageId - val taskType = Utils.getFormattedClassName(task) - listenerBus.post(SparkListenerTaskEnd(stageId, taskType, reason, taskInfo, taskMetrics)) - handleTaskCompletion(completion) - - case TaskSetFailed(taskSet, reason) => - stageIdToStage.get(taskSet.stageId).foreach { abortStage(_, reason) } - - case ResubmitFailedStages => - if (failedStages.size > 0) { - // Failed stages may be removed by job cancellation, so failed might be empty even if - // the ResubmitFailedStages event has been scheduled. - resubmitFailedStages() - } - - case StopDAGScheduler => - // Cancel any active jobs - for (job <- activeJobs) { - val error = new SparkException("Job cancelled because SparkContext was shut down") - job.listener.jobFailed(error) - // Tell the listeners that all of the running stages have ended. Don't bother - // cancelling the stages because if the DAG scheduler is stopped, the entire application - // is in the process of getting stopped. - val stageFailedMessage = "Stage cancelled because SparkContext was shut down" - runningStages.foreach { stage => - val info = stageToInfos(stage) - info.stageFailed(stageFailedMessage) - listenerBus.post(SparkListenerStageCompleted(info)) - } - listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error))) - } - return true - } - false - } - /** * Resubmit any failed stages. Ordinarily called after a small amount of time has passed since * the last fetch failure. @@ -704,7 +532,7 @@ class DAGScheduler( * We run the operation in a separate thread just in case it takes a bunch of time, so that we * don't block the DAGScheduler event loop or other concurrent jobs. */ - protected def runLocally(job: ActiveJob) { + protected[scheduler] def runLocally(job: ActiveJob) { logInfo("Computing the requested partition locally") new Thread("Local computation of job " + job.jobId) { override def run() { @@ -757,7 +585,7 @@ class DAGScheduler( } /** Submits stage, but first recursively submits any missing parents. */ - private def submitStage(stage: Stage) { + private[scheduler] def submitStage(stage: Stage) { val jobId = activeJobForStage(stage) if (jobId.isDefined) { logDebug("submitStage(" + stage + ")") @@ -845,7 +673,7 @@ class DAGScheduler( * Responds to a task finishing. This is called inside the event loop so it assumes that it can * modify the scheduler's internal state. Use taskEnded() to post a task end event from outside. */ - private def handleTaskCompletion(event: CompletionEvent) { + private[scheduler] def handleTaskCompletion(event: CompletionEvent) { val task = event.task if (!stageIdToStage.contains(task.stageId)) { // Skip all the actions if the stage has been cancelled. @@ -998,7 +826,8 @@ class DAGScheduler( * Optionally the epoch during which the failure was caught can be passed to avoid allowing * stray fetch failures from possibly retriggering the detection of a node as lost. */ - private def handleExecutorLost(execId: String, maybeEpoch: Option[Long] = None) { + private[scheduler] def handleExecutorLost(execId: String, + maybeEpoch: Option[Long] = None) { val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch) if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) { failedEpoch(execId) = currentEpoch @@ -1020,7 +849,7 @@ class DAGScheduler( } } - private def handleExecutorAdded(execId: String, host: String) { + private[scheduler] def handleExecutorAdded(execId: String, host: String) { // remove from failedEpoch(execId) ? if (failedEpoch.contains(execId)) { logInfo("Host added was in lost list earlier: " + host) @@ -1039,7 +868,7 @@ class DAGScheduler( } } - private def handleJobCancellation(jobId: Int, reason: String = "") { + private[scheduler] def handleJobCancellation(jobId: Int, reason: String = "") { if (!jobIdToStageIds.contains(jobId)) { logDebug("Trying to cancel unregistered job " + jobId) } else { @@ -1052,7 +881,7 @@ class DAGScheduler( * Aborts all jobs depending on a particular Stage. This is called in response to a task set * being canceled by the TaskScheduler. Use taskSetFailed() to inject this event from outside. */ - private def abortStage(failedStage: Stage, reason: String) { + private[scheduler] def abortStage(failedStage: Stage, reason: String) { if (!stageIdToStage.contains(failedStage.id)) { // Skip all the actions if the stage has been removed. return @@ -1178,13 +1007,159 @@ class DAGScheduler( def stop() { logInfo("Stopping DAGScheduler") - if (eventProcessActor != null) { - eventProcessActor ! StopDAGScheduler + if (dagSchedulerActorSupervisor != null) { + dagSchedulerActorSupervisor ! PoisonPill.getInstance } taskScheduler.stop() } } +private[scheduler] class DAGSchedulerActorSupervisor(dagScheduler: DAGScheduler) + extends Actor with Logging { + + override val supervisorStrategy = + OneForOneStrategy() { + case x: Exception => { + logError("eventProcesserActor failed due to the error %s; shutting down SparkContext" + .format(x.getMessage)) + dagScheduler.doCancelAllJobs() + dagScheduler.sc.stop() + Stop + } + } + + def receive = { + case p: Props => sender ! context.actorOf(p) + case _ => + } + + dagScheduler.eventProcessActor = context.actorOf( + Props(new DAGSchedulerEventProcessActor(dagScheduler))) +} + +private[scheduler] class DAGSchedulerEventProcessActor(dagScheduler: DAGScheduler) + extends Actor with Logging { + + override def preStart() { + // set DAGScheduler for taskScheduler to ensure eventProcessActor is always + // valid when the messages arrive + dagScheduler.taskScheduler.setDAGScheduler(dagScheduler) + } + + /** + * The main event loop of the DAG scheduler. + */ + def receive = { + case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) => + var finalStage: Stage = null + try { + // New stage creation may throw an exception if, for example, jobs are run on a + // HadoopRDD whose underlying HDFS files have been deleted. + finalStage = dagScheduler.newStage(rdd, partitions.size, None, jobId, Some(callSite)) + } catch { + case e: Exception => + logWarning("Creating new stage failed due to exception - job: " + jobId, e) + listener.jobFailed(e) + } + val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties) + dagScheduler.clearCacheLocs() + logInfo("Got job " + job.jobId + " (" + callSite + ") with " + partitions.length + + " output partitions (allowLocal=" + allowLocal + ")") + logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")") + logInfo("Parents of final stage: " + finalStage.parents) + logInfo("Missing parents: " + dagScheduler.getMissingParentStages(finalStage)) + if (allowLocal && finalStage.parents.size == 0 && partitions.length == 1) { + // Compute very short actions like first() or take() with no parent stages locally. + dagScheduler.listenerBus.post(SparkListenerJobStart(job.jobId, Array[Int](), properties)) + dagScheduler.runLocally(job) + } else { + dagScheduler.jobIdToActiveJob(jobId) = job + dagScheduler.activeJobs += job + dagScheduler.resultStageToJob(finalStage) = job + dagScheduler.listenerBus.post( + SparkListenerJobStart(job.jobId, dagScheduler.jobIdToStageIds(jobId).toArray, + properties)) + dagScheduler.submitStage(finalStage) + } + + case JobCancelled(jobId) => + dagScheduler.handleJobCancellation(jobId, "part of cancel all jobs") + + case JobGroupCancelled(groupId) => + // Cancel all jobs belonging to this job group. + // First finds all active jobs with this group id, and then kill stages for them. + val activeInGroup = dagScheduler.activeJobs.filter(activeJob => + groupId == activeJob.properties.get(SparkContext.SPARK_JOB_GROUP_ID)) + val jobIds = activeInGroup.map(_.jobId) + jobIds.foreach(dagScheduler.handleJobCancellation(_, "part of cancel job group")) + + case AllJobsCancelled => + dagScheduler.doCancelAllJobs() + + case ExecutorAdded(execId, host) => + dagScheduler.handleExecutorAdded(execId, host) + + case ExecutorLost(execId) => + dagScheduler.handleExecutorLost(execId) + + case BeginEvent(task, taskInfo) => + for ( + job <- dagScheduler.jobIdToActiveJob.get(task.stageId); + stage <- dagScheduler.stageIdToStage.get(task.stageId); + stageInfo <- dagScheduler.stageToInfos.get(stage) + ) { + if (taskInfo.serializedSize > DAGScheduler.TASK_SIZE_TO_WARN * 1024 && + !stageInfo.emittedTaskSizeWarning) { + stageInfo.emittedTaskSizeWarning = true + logWarning(("Stage %d (%s) contains a task of very large " + + "size (%d KB). The maximum recommended task size is %d KB.").format( + task.stageId, stageInfo.name, taskInfo.serializedSize / 1024, + DAGScheduler.TASK_SIZE_TO_WARN)) + } + } + dagScheduler.listenerBus.post(SparkListenerTaskStart(task.stageId, taskInfo)) + + case GettingResultEvent(task, taskInfo) => + dagScheduler.listenerBus.post(SparkListenerTaskGettingResult(taskInfo)) + + case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) => + val stageId = task.stageId + val taskType = Utils.getFormattedClassName(task) + dagScheduler.listenerBus.post(SparkListenerTaskEnd(stageId, taskType, reason, taskInfo, + taskMetrics)) + dagScheduler.handleTaskCompletion(completion) + + case TaskSetFailed(taskSet, reason) => + dagScheduler.stageIdToStage.get(taskSet.stageId).foreach { + dagScheduler.abortStage(_, reason) } + + case ResubmitFailedStages => + if (dagScheduler.failedStages.size > 0) { + // Failed stages may be removed by job cancellation, so failed might be empty even if + // the ResubmitFailedStages event has been scheduled. + dagScheduler.resubmitFailedStages() + } + } + + override def postStop() { + // Cancel any active jobs + for (job <- dagScheduler.activeJobs) { + val error = new SparkException("Job cancelled because SparkContext was shut down") + job.listener.jobFailed(error) + // Tell the listeners that all of the running stages have ended. Don't bother + // cancelling the stages because if the DAG scheduler is stopped, the entire application + // is in the process of getting stopped. + val stageFailedMessage = "Stage cancelled because SparkContext was shut down" + dagScheduler.runningStages.foreach { stage => + val info = dagScheduler.stageToInfos(stage) + info.stageFailed(stageFailedMessage) + dagScheduler.listenerBus.post(SparkListenerStageCompleted(info)) + } + dagScheduler.listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error))) + } + } +} + private[spark] object DAGScheduler { // The time, in millis, to wait for fetch failure events to stop coming in after one is detected; // this is a simplistic way to avoid resubmitting tasks in the non-fetchable map stage one by one diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala index 0800c5684c60f..1556b56920655 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala @@ -76,5 +76,3 @@ private[scheduler] case class TaskSetFailed(taskSet: TaskSet, reason: String) extends DAGSchedulerEvent private[scheduler] case object ResubmitFailedStages extends DAGSchedulerEvent - -private[scheduler] case object StopDAGScheduler extends DAGSchedulerEvent diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index b53ccad40eb4f..cace8bcfc6cc6 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -20,27 +20,29 @@ package org.apache.spark.scheduler import scala.Tuple2 import scala.collection.mutable.{HashSet, HashMap, Map} import scala.language.reflectiveCalls +import scala.collection.mutable.{HashMap, Map, HashSet} +import akka.actor._ +import akka.testkit.{TestKit, ImplicitSender, TestActorRef} import org.scalatest.{BeforeAndAfter, FunSuite} +import org.scalatest.matchers.MustMatchers import org.apache.spark._ import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster} -/** - * Tests for DAGScheduler. These tests directly call the event processing functions in DAGScheduler - * rather than spawning an event loop thread as happens in the real code. They use EasyMock - * to mock out two classes that DAGScheduler interacts with: TaskScheduler (to which TaskSets are - * submitted) and BlockManagerMaster (from which cache locations are retrieved and to which dead - * host notifications are sent). In addition, tests may check for side effects on a non-mocked - * MapOutputTracker instance. - * - * Tests primarily consist of running DAGScheduler#processEvent and - * DAGScheduler#submitWaitingStages (via test utility functions like runEvent or respondToTaskSet) - * and capturing the resulting TaskSets from the mock TaskScheduler. - */ -class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkContext { +class BuggyDAGEventProcessActor extends Actor { + val state = 0 + def receive = { + case _ => throw new SparkException("error") + } +} + +class DAGSchedulerSuite + extends TestKit(ActorSystem("DAGSchedulerSuite")) with FunSuite with MustMatchers + with ImplicitSender with BeforeAndAfter with LocalSparkContext { + val conf = new SparkConf /** Set of TaskSets the DAGScheduler has requested executed. */ val taskSets = scala.collection.mutable.Buffer[TaskSet]() @@ -82,6 +84,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont var mapOutputTracker: MapOutputTrackerMaster = null var scheduler: DAGScheduler = null + var dagEventProcessTestActor: TestActorRef[DAGSchedulerEventProcessActor] = null /** * Set of cache locations to return from our mock BlockManagerMaster. @@ -132,12 +135,19 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont runLocallyWithinThread(job) } } + dagEventProcessTestActor = TestActorRef[DAGSchedulerEventProcessActor]( + Props(classOf[DAGSchedulerEventProcessActor], scheduler))(system) } after { scheduler.stop() } + override def afterAll() { + super.afterAll() + TestKit.shutdownActorSystem(system) + } + /** * Type of RDD we use for testing. Note that we should never call the real RDD compute methods. * This is a pair RDD type so it can always be used in ShuffleDependencies. @@ -179,8 +189,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont * DAGScheduler event loop. */ private def runEvent(event: DAGSchedulerEvent) { - assert(!scheduler.processEvent(event)) - scheduler.submitWaitingStages() + dagEventProcessTestActor.receive(event) } /** @@ -210,7 +219,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont listener: JobListener = jobListener): Int = { val jobId = scheduler.nextJobId.getAndIncrement() runEvent(JobSubmitted(jobId, rdd, func, partitions, allowLocal, null, listener)) - return jobId + jobId } /** Sends TaskSetFailed to the scheduler. */ @@ -224,19 +233,17 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont } test("zero split job") { - val rdd = makeRdd(0, Nil) var numResults = 0 val fakeListener = new JobListener() { override def taskSucceeded(partition: Int, value: Any) = numResults += 1 override def jobFailed(exception: Exception) = throw exception } - submit(rdd, Array(), listener = fakeListener) + submit(makeRdd(0, Nil), Array(), listener = fakeListener) assert(numResults === 0) } test("run trivial job") { - val rdd = makeRdd(1, Nil) - submit(rdd, Array(0)) + submit(makeRdd(1, Nil), Array(0)) complete(taskSets(0), List((Success, 42))) assert(results === Map(0 -> 42)) assertDataStructuresEmpty @@ -530,6 +537,18 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont assertDataStructuresEmpty } + test("DAGSchedulerActorSupervisor closes the SparkContext when EventProcessActor crashes") { + val actorSystem = ActorSystem("test") + val supervisor = actorSystem.actorOf( + Props(classOf[DAGSchedulerActorSupervisor], scheduler), "dagSupervisor") + supervisor ! Props[BuggyDAGEventProcessActor] + val child = expectMsgType[ActorRef] + watch(child) + child ! "hi" + expectMsgPF(){ case Terminated(child) => () } + assert(scheduler.sc.dagScheduler === null) + } + /** * Assert that the supplied TaskSet has exactly the given hosts as its preferred locations. * Note that this checks only the host and not the executor ID. @@ -562,3 +581,4 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont assert(scheduler.waitingStages.isEmpty) } } + diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 2fb750d9ee378..59ded99ba1ac8 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -305,7 +305,6 @@ class TaskSchedulerImplSuite extends FunSuite with LocalSparkContext with Loggin override def taskStarted(task: Task[_], taskInfo: TaskInfo) {} override def executorAdded(execId: String, host: String) {} } - // Give zero core offers. Should not generate any tasks val zeroCoreWorkerOffers = Seq(new WorkerOffer("executor0", "host0", 0), new WorkerOffer("executor1", "host1", 0)) From 228f4b0fbe049358f668c693adce317f3451b26e Mon Sep 17 00:00:00 2001 From: CodingCat Date: Thu, 10 Apr 2014 07:46:34 -0400 Subject: [PATCH 03/19] address comments from Mark fix bug --- .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 6 ++---- .../org/apache/spark/scheduler/DAGSchedulerSuite.scala | 4 ---- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 2426d321c373d..db264c5d589b4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1007,9 +1007,7 @@ class DAGScheduler( def stop() { logInfo("Stopping DAGScheduler") - if (dagSchedulerActorSupervisor != null) { - dagSchedulerActorSupervisor ! PoisonPill.getInstance - } + dagSchedulerActorSupervisor ! PoisonPill taskScheduler.stop() } } @@ -1083,7 +1081,7 @@ private[scheduler] class DAGSchedulerEventProcessActor(dagScheduler: DAGSchedule } case JobCancelled(jobId) => - dagScheduler.handleJobCancellation(jobId, "part of cancel all jobs") + dagScheduler.handleJobCancellation(jobId) case JobGroupCancelled(groupId) => // Cancel all jobs belonging to this job group. diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index cace8bcfc6cc6..6eedf5d92ff5c 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -139,10 +139,6 @@ class DAGSchedulerSuite Props(classOf[DAGSchedulerEventProcessActor], scheduler))(system) } - after { - scheduler.stop() - } - override def afterAll() { super.afterAll() TestKit.shutdownActorSystem(system) From 089bc2f62c2e005515939ea730ac63c77f0d1eac Mon Sep 17 00:00:00 2001 From: CodingCat Date: Fri, 18 Apr 2014 09:04:01 -0400 Subject: [PATCH 04/19] address andrew's comments --- .../org/apache/spark/scheduler/DAGScheduler.scala | 15 ++++++--------- .../spark/scheduler/DAGSchedulerSuite.scala | 8 +++----- .../spark/scheduler/TaskSchedulerImplSuite.scala | 1 + 3 files changed, 10 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index db264c5d589b4..d5305ff2bd396 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -26,8 +26,8 @@ import scala.concurrent.duration._ import scala.reflect.ClassTag import akka.actor._ -import akka.actor.SupervisorStrategy.Stop import akka.actor.OneForOneStrategy +import akka.actor.SupervisorStrategy.Stop import org.apache.spark._ import org.apache.spark.executor.TaskMetrics @@ -113,8 +113,6 @@ class DAGScheduler( // stray messages to detect. private val failedEpoch = new HashMap[String, Long] - taskScheduler.setDAGScheduler(this) - private val dagSchedulerActorSupervisor = env.actorSystem.actorOf(Props(new DAGSchedulerActorSupervisor(this))) @@ -826,8 +824,7 @@ class DAGScheduler( * Optionally the epoch during which the failure was caught can be passed to avoid allowing * stray fetch failures from possibly retriggering the detection of a node as lost. */ - private[scheduler] def handleExecutorLost(execId: String, - maybeEpoch: Option[Long] = None) { + private[scheduler] def handleExecutorLost(execId: String, maybeEpoch: Option[Long] = None) { val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch) if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) { failedEpoch(execId) = currentEpoch @@ -1028,7 +1025,7 @@ private[scheduler] class DAGSchedulerActorSupervisor(dagScheduler: DAGScheduler) def receive = { case p: Props => sender ! context.actorOf(p) - case _ => + case _ => logWarning("recevied unknown message in DAGSchedulerActorSupervisor") } dagScheduler.eventProcessActor = context.actorOf( @@ -1061,9 +1058,9 @@ private[scheduler] class DAGSchedulerEventProcessActor(dagScheduler: DAGSchedule } val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties) dagScheduler.clearCacheLocs() - logInfo("Got job " + job.jobId + " (" + callSite + ") with " + partitions.length + - " output partitions (allowLocal=" + allowLocal + ")") - logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")") + logInfo("Got job %s (%s) with %d output partitions (allowLocal=%s)". + format(job.jobId, callSite, partitions.length, allowLocal)) + logInfo("Final stage: %s (%s)".format(finalStage, finalStage.name)) logInfo("Parents of final stage: " + finalStage.parents) logInfo("Missing parents: " + dagScheduler.getMissingParentStages(finalStage)) if (allowLocal && finalStage.parents.size == 0 && partitions.length == 1) { diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 6eedf5d92ff5c..d2e809d80e49c 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -20,10 +20,9 @@ package org.apache.spark.scheduler import scala.Tuple2 import scala.collection.mutable.{HashSet, HashMap, Map} import scala.language.reflectiveCalls -import scala.collection.mutable.{HashMap, Map, HashSet} import akka.actor._ -import akka.testkit.{TestKit, ImplicitSender, TestActorRef} +import akka.testkit.{ImplicitSender, TestKit, TestActorRef} import org.scalatest.{BeforeAndAfter, FunSuite} import org.scalatest.matchers.MustMatchers @@ -39,9 +38,8 @@ class BuggyDAGEventProcessActor extends Actor { } } -class DAGSchedulerSuite - extends TestKit(ActorSystem("DAGSchedulerSuite")) with FunSuite with MustMatchers - with ImplicitSender with BeforeAndAfter with LocalSparkContext { +class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with FunSuite + with MustMatchers with ImplicitSender with BeforeAndAfter with LocalSparkContext { val conf = new SparkConf /** Set of TaskSets the DAGScheduler has requested executed. */ diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 59ded99ba1ac8..a8b605c5b212a 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -305,6 +305,7 @@ class TaskSchedulerImplSuite extends FunSuite with LocalSparkContext with Loggin override def taskStarted(task: Task[_], taskInfo: TaskInfo) {} override def executorAdded(execId: String, host: String) {} } + taskScheduler.setDAGScheduler(dagScheduler) // Give zero core offers. Should not generate any tasks val zeroCoreWorkerOffers = Seq(new WorkerOffer("executor0", "host0", 0), new WorkerOffer("executor1", "host1", 0)) From fdf3b177c94789c3d3fdc514e61b1e66ea65ffd6 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Fri, 18 Apr 2014 09:42:55 -0400 Subject: [PATCH 05/19] just to retrigger the test...... --- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index d5305ff2bd396..d81d01b8e0df2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1060,7 +1060,7 @@ private[scheduler] class DAGSchedulerEventProcessActor(dagScheduler: DAGSchedule dagScheduler.clearCacheLocs() logInfo("Got job %s (%s) with %d output partitions (allowLocal=%s)". format(job.jobId, callSite, partitions.length, allowLocal)) - logInfo("Final stage: %s (%s)".format(finalStage, finalStage.name)) + logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")") logInfo("Parents of final stage: " + finalStage.parents) logInfo("Missing parents: " + dagScheduler.getMissingParentStages(finalStage)) if (allowLocal && finalStage.parents.size == 0 && partitions.length == 1) { From a7a2a974a971a1dd3a3169dbd77a0fb3de99e689 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Tue, 22 Apr 2014 12:54:06 -0400 Subject: [PATCH 06/19] add StageCancelled message --- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index d81d01b8e0df2..e3ae0b4c941dc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -854,7 +854,7 @@ class DAGScheduler( } } - private def handleStageCancellation(stageId: Int) { + private[scheduler] def handleStageCancellation(stageId: Int) { if (stageIdToJobIds.contains(stageId)) { val jobsThatUseStage: Array[Int] = stageIdToJobIds(stageId).toArray jobsThatUseStage.foreach(jobId => { @@ -1077,6 +1077,9 @@ private[scheduler] class DAGSchedulerEventProcessActor(dagScheduler: DAGSchedule dagScheduler.submitStage(finalStage) } + case StageCancelled(stageId) => + dagScheduler.handleStageCancellation(stageId) + case JobCancelled(jobId) => dagScheduler.handleJobCancellation(jobId) From 9dfb0335fa5fb736d8f4675448ef6bc5267b3a15 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Tue, 22 Apr 2014 13:54:38 -0400 Subject: [PATCH 07/19] remove unnecessary changes --- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 2 +- .../scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index e3ae0b4c941dc..f6d01f7bcc741 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -57,7 +57,7 @@ class DAGScheduler( private[scheduler] val listenerBus: LiveListenerBus, mapOutputTracker: MapOutputTrackerMaster, blockManagerMaster: BlockManagerMaster, - private[scheduler] val env: SparkEnv) + env: SparkEnv) extends Logging { import DAGScheduler._ diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index d2e809d80e49c..5637ddaddb725 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -24,7 +24,6 @@ import scala.language.reflectiveCalls import akka.actor._ import akka.testkit.{ImplicitSender, TestKit, TestActorRef} import org.scalatest.{BeforeAndAfter, FunSuite} -import org.scalatest.matchers.MustMatchers import org.apache.spark._ import org.apache.spark.rdd.RDD @@ -39,7 +38,7 @@ class BuggyDAGEventProcessActor extends Actor { } class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with FunSuite - with MustMatchers with ImplicitSender with BeforeAndAfter with LocalSparkContext { + with ImplicitSender with BeforeAndAfter with LocalSparkContext { val conf = new SparkConf /** Set of TaskSets the DAGScheduler has requested executed. */ From 5d1636adabf09cd3f007f5d90477a538ee19326d Mon Sep 17 00:00:00 2001 From: CodingCat Date: Tue, 22 Apr 2014 15:07:50 -0400 Subject: [PATCH 08/19] re-trigger the test..... --- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index f6d01f7bcc741..0aa6792d2dc47 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1140,7 +1140,7 @@ private[scheduler] class DAGSchedulerEventProcessActor(dagScheduler: DAGSchedule } override def postStop() { - // Cancel any active jobs + // Cancel any active jobs in postStop hook for (job <- dagScheduler.activeJobs) { val error = new SparkException("Job cancelled because SparkContext was shut down") job.listener.jobFailed(error) From ac878abd676baf8d85822ffbc8cd6c3ad35945a2 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Tue, 22 Apr 2014 16:06:39 -0400 Subject: [PATCH 09/19] typo fix --- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 0aa6792d2dc47..6f91404045731 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1014,18 +1014,17 @@ private[scheduler] class DAGSchedulerActorSupervisor(dagScheduler: DAGScheduler) override val supervisorStrategy = OneForOneStrategy() { - case x: Exception => { + case x: Exception => logError("eventProcesserActor failed due to the error %s; shutting down SparkContext" .format(x.getMessage)) dagScheduler.doCancelAllJobs() dagScheduler.sc.stop() Stop - } } def receive = { case p: Props => sender ! context.actorOf(p) - case _ => logWarning("recevied unknown message in DAGSchedulerActorSupervisor") + case _ => logWarning("received unknown message in DAGSchedulerActorSupervisor") } dagScheduler.eventProcessActor = context.actorOf( From a9eea039ab713682906ab7e77e65ea37c3db41df Mon Sep 17 00:00:00 2001 From: CodingCat Date: Wed, 23 Apr 2014 16:50:25 -0400 Subject: [PATCH 10/19] address Matei's comments --- .../main/scala/org/apache/spark/rdd/RDD.scala | 6 +- .../apache/spark/scheduler/DAGScheduler.scala | 188 ++++++++++-------- 2 files changed, 109 insertions(+), 85 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 891efccf23b6a..e0fbf07fb6187 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1112,9 +1112,9 @@ abstract class RDD[T: ClassTag]( @transient private var doCheckpointCalled = false /** - * Performs the checkpointing of this RDD by saving this. It is called by the DAGScheduler - * after a job using this RDD has completed (therefore the RDD has been materialized and - * potentially stored in memory). doCheckpoint() is called recursively on the parent RDDs. + * Performs the checkpointing of this RDD by saving this. It is called after a job using this RDD + * has completed (therefore the RDD has been materialized and potentially stored in memory). + * doCheckpoint() is called recursively on the parent RDDs. */ private[spark] def doCheckpoint() { if (!doCheckpointCalled) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 6f91404045731..d88e8acbd72dd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -498,12 +498,16 @@ class DAGScheduler( * the last fetch failure. */ private[scheduler] def resubmitFailedStages() { - logInfo("Resubmitting failed stages") - clearCacheLocs() - val failedStagesCopy = failedStages.toArray - failedStages.clear() - for (stage <- failedStagesCopy.sortBy(_.jobId)) { - submitStage(stage) + if (failedStages.size > 0) { + // Failed stages may be removed by job cancellation, so failed might be empty even if + // the ResubmitFailedStages event has been scheduled. + logInfo("Resubmitting failed stages") + clearCacheLocs() + val failedStagesCopy = failedStages.toArray + failedStages.clear() + for (stage <- failedStagesCopy.sortBy(_.jobId)) { + submitStage(stage) + } } } @@ -582,6 +586,91 @@ class DAGScheduler( } } + private[scheduler] def handleJobGroupCancelled(groupId: String) { + // Cancel all jobs belonging to this job group. + // First finds all active jobs with this group id, and then kill stages for them. + val activeInGroup = activeJobs.filter(activeJob => + groupId == activeJob.properties.get(SparkContext.SPARK_JOB_GROUP_ID)) + val jobIds = activeInGroup.map(_.jobId) + jobIds.foreach(handleJobCancellation(_, "part of cancel job group")) + } + + private[scheduler] def handleBeginEvent(task: Task[_], taskInfo: TaskInfo) { + for (stage <- stageIdToStage.get(task.stageId); stageInfo <- stageToInfos.get(stage)) { + if (taskInfo.serializedSize > DAGScheduler.TASK_SIZE_TO_WARN * 1024 && + !stageInfo.emittedTaskSizeWarning) { + stageInfo.emittedTaskSizeWarning = true + logWarning(("Stage %d (%s) contains a task of very large " + + "size (%d KB). The maximum recommended task size is %d KB.").format( + task.stageId, stageInfo.name, taskInfo.serializedSize / 1024, + DAGScheduler.TASK_SIZE_TO_WARN)) + } + } + listenerBus.post(SparkListenerTaskStart(task.stageId, taskInfo)) + } + + private[scheduler] def handleTaskSetFailed(taskSet: TaskSet, reason: String) { + stageIdToStage.get(taskSet.stageId).foreach {abortStage(_, reason) } + } + + private[scheduler] def cleanUpAfterSchedulerStop() { + for (job <- activeJobs) { + val error = new SparkException("Job cancelled because SparkContext was shut down") + job.listener.jobFailed(error) + // Tell the listeners that all of the running stages have ended. Don't bother + // cancelling the stages because if the DAG scheduler is stopped, the entire application + // is in the process of getting stopped. + val stageFailedMessage = "Stage cancelled because SparkContext was shut down" + runningStages.foreach { stage => + val info = stageToInfos(stage) + info.stageFailed(stageFailedMessage) + listenerBus.post(SparkListenerStageCompleted(info)) + } + listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error))) + } + } + + private[scheduler] def handleJobSubmitted(jobId: Int, + finalRDD: RDD[_], + func: (TaskContext, Iterator[_]) => _, + partitions: Array[Int], + allowLocal: Boolean, + callSite: String, + listener: JobListener, + properties: Properties = null) { + var finalStage: Stage = null + try { + // New stage creation may throw an exception if, for example, jobs are run on a + // HadoopRDD whose underlying HDFS files have been deleted. + finalStage = newStage(finalRDD, partitions.size, None, jobId, Some(callSite)) + } catch { + case e: Exception => + logWarning("Creating new stage failed due to exception - job: " + jobId, e) + listener.jobFailed(e) + } + if (finalStage != null) { + val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties) + clearCacheLocs() + logInfo("Got job %s (%s) with %d output partitions (allowLocal=%s)". + format(job.jobId, callSite, partitions.length, allowLocal)) + logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")") + logInfo("Parents of final stage: " + finalStage.parents) + logInfo("Missing parents: " + getMissingParentStages(finalStage)) + if (allowLocal && finalStage.parents.size == 0 && partitions.length == 1) { + // Compute very short actions like first() or take() with no parent stages locally. + listenerBus.post(SparkListenerJobStart(job.jobId, Array[Int](), properties)) + runLocally(job) + } else { + jobIdToActiveJob(jobId) = job + activeJobs += job + resultStageToJob(finalStage) = job + listenerBus.post(SparkListenerJobStart(job.jobId, jobIdToStageIds(jobId).toArray, + properties)) + submitStage(finalStage) + } + } + } + /** Submits stage, but first recursively submits any missing parents. */ private[scheduler] def submitStage(stage: Stage) { val jobId = activeJobForStage(stage) @@ -673,6 +762,10 @@ class DAGScheduler( */ private[scheduler] def handleTaskCompletion(event: CompletionEvent) { val task = event.task + val stageId = task.stageId + val taskType = Utils.getFormattedClassName(task) + listenerBus.post(SparkListenerTaskEnd(stageId, taskType, event.reason, event.taskInfo, + event.taskMetrics)) if (!stageIdToStage.contains(task.stageId)) { // Skip all the actions if the stage has been cancelled. return @@ -1045,36 +1138,8 @@ private[scheduler] class DAGSchedulerEventProcessActor(dagScheduler: DAGSchedule */ def receive = { case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) => - var finalStage: Stage = null - try { - // New stage creation may throw an exception if, for example, jobs are run on a - // HadoopRDD whose underlying HDFS files have been deleted. - finalStage = dagScheduler.newStage(rdd, partitions.size, None, jobId, Some(callSite)) - } catch { - case e: Exception => - logWarning("Creating new stage failed due to exception - job: " + jobId, e) - listener.jobFailed(e) - } - val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties) - dagScheduler.clearCacheLocs() - logInfo("Got job %s (%s) with %d output partitions (allowLocal=%s)". - format(job.jobId, callSite, partitions.length, allowLocal)) - logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")") - logInfo("Parents of final stage: " + finalStage.parents) - logInfo("Missing parents: " + dagScheduler.getMissingParentStages(finalStage)) - if (allowLocal && finalStage.parents.size == 0 && partitions.length == 1) { - // Compute very short actions like first() or take() with no parent stages locally. - dagScheduler.listenerBus.post(SparkListenerJobStart(job.jobId, Array[Int](), properties)) - dagScheduler.runLocally(job) - } else { - dagScheduler.jobIdToActiveJob(jobId) = job - dagScheduler.activeJobs += job - dagScheduler.resultStageToJob(finalStage) = job - dagScheduler.listenerBus.post( - SparkListenerJobStart(job.jobId, dagScheduler.jobIdToStageIds(jobId).toArray, - properties)) - dagScheduler.submitStage(finalStage) - } + dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, + listener, properties) case StageCancelled(stageId) => dagScheduler.handleStageCancellation(stageId) @@ -1083,12 +1148,7 @@ private[scheduler] class DAGSchedulerEventProcessActor(dagScheduler: DAGSchedule dagScheduler.handleJobCancellation(jobId) case JobGroupCancelled(groupId) => - // Cancel all jobs belonging to this job group. - // First finds all active jobs with this group id, and then kill stages for them. - val activeInGroup = dagScheduler.activeJobs.filter(activeJob => - groupId == activeJob.properties.get(SparkContext.SPARK_JOB_GROUP_ID)) - val jobIds = activeInGroup.map(_.jobId) - jobIds.foreach(dagScheduler.handleJobCancellation(_, "part of cancel job group")) + dagScheduler.handleJobGroupCancelled(groupId) case AllJobsCancelled => dagScheduler.doCancelAllJobs() @@ -1100,60 +1160,24 @@ private[scheduler] class DAGSchedulerEventProcessActor(dagScheduler: DAGSchedule dagScheduler.handleExecutorLost(execId) case BeginEvent(task, taskInfo) => - for ( - job <- dagScheduler.jobIdToActiveJob.get(task.stageId); - stage <- dagScheduler.stageIdToStage.get(task.stageId); - stageInfo <- dagScheduler.stageToInfos.get(stage) - ) { - if (taskInfo.serializedSize > DAGScheduler.TASK_SIZE_TO_WARN * 1024 && - !stageInfo.emittedTaskSizeWarning) { - stageInfo.emittedTaskSizeWarning = true - logWarning(("Stage %d (%s) contains a task of very large " + - "size (%d KB). The maximum recommended task size is %d KB.").format( - task.stageId, stageInfo.name, taskInfo.serializedSize / 1024, - DAGScheduler.TASK_SIZE_TO_WARN)) - } - } - dagScheduler.listenerBus.post(SparkListenerTaskStart(task.stageId, taskInfo)) + dagScheduler.handleBeginEvent(task, taskInfo) case GettingResultEvent(task, taskInfo) => dagScheduler.listenerBus.post(SparkListenerTaskGettingResult(taskInfo)) case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) => - val stageId = task.stageId - val taskType = Utils.getFormattedClassName(task) - dagScheduler.listenerBus.post(SparkListenerTaskEnd(stageId, taskType, reason, taskInfo, - taskMetrics)) dagScheduler.handleTaskCompletion(completion) case TaskSetFailed(taskSet, reason) => - dagScheduler.stageIdToStage.get(taskSet.stageId).foreach { - dagScheduler.abortStage(_, reason) } + dagScheduler.handleTaskSetFailed(taskSet, reason) case ResubmitFailedStages => - if (dagScheduler.failedStages.size > 0) { - // Failed stages may be removed by job cancellation, so failed might be empty even if - // the ResubmitFailedStages event has been scheduled. - dagScheduler.resubmitFailedStages() - } + dagScheduler.resubmitFailedStages() } override def postStop() { // Cancel any active jobs in postStop hook - for (job <- dagScheduler.activeJobs) { - val error = new SparkException("Job cancelled because SparkContext was shut down") - job.listener.jobFailed(error) - // Tell the listeners that all of the running stages have ended. Don't bother - // cancelling the stages because if the DAG scheduler is stopped, the entire application - // is in the process of getting stopped. - val stageFailedMessage = "Stage cancelled because SparkContext was shut down" - dagScheduler.runningStages.foreach { stage => - val info = dagScheduler.stageToInfos(stage) - info.stageFailed(stageFailedMessage) - dagScheduler.listenerBus.post(SparkListenerStageCompleted(info)) - } - dagScheduler.listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error))) - } + dagScheduler.cleanUpAfterSchedulerStop() } } From c048d0e8b37e78c63e8b96a2d200f0450fc4737c Mon Sep 17 00:00:00 2001 From: CodingCat Date: Wed, 23 Apr 2014 17:23:31 -0400 Subject: [PATCH 11/19] call submitWaitingStages for every event --- .../apache/spark/scheduler/DAGScheduler.scala | 38 +++++++++++++------ .../spark/scheduler/DAGSchedulerEvent.scala | 2 +- .../spark/scheduler/TaskSetManager.scala | 2 +- 3 files changed, 29 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index d88e8acbd72dd..8307cef22a676 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -54,7 +54,7 @@ private[spark] class DAGScheduler( private[scheduler] val sc: SparkContext, private[scheduler] val taskScheduler: TaskScheduler, - private[scheduler] val listenerBus: LiveListenerBus, + listenerBus: LiveListenerBus, mapOutputTracker: MapOutputTrackerMaster, blockManagerMaster: BlockManagerMaster, env: SparkEnv) @@ -122,8 +122,8 @@ class DAGScheduler( } // Called to report that a task has completed and results are being fetched remotely. - def taskGettingResult(task: Task[_], taskInfo: TaskInfo) { - eventProcessActor ! GettingResultEvent(task, taskInfo) + def taskGettingResult(taskInfo: TaskInfo) { + eventProcessActor ! GettingResultEvent(taskInfo) } // Called by TaskScheduler to report task completions or failures. @@ -164,7 +164,7 @@ class DAGScheduler( cacheLocs(rdd.id) } - private[scheduler] def clearCacheLocs() { + private def clearCacheLocs() { cacheLocs.clear() } @@ -190,7 +190,7 @@ class DAGScheduler( * jobId. Production of shuffle map stages should always use newOrUsedStage, not newStage * directly. */ - private[scheduler] def newStage( + private def newStage( rdd: RDD[_], numTasks: Int, shuffleDep: Option[ShuffleDependency[_,_]], @@ -264,7 +264,7 @@ class DAGScheduler( parents.toList } - private[scheduler] def getMissingParentStages(stage: Stage): List[Stage] = { + private def getMissingParentStages(stage: Stage): List[Stage] = { val missing = new HashSet[Stage] val visited = new HashSet[RDD[_]] def visit(rdd: RDD[_]) { @@ -484,6 +484,7 @@ class DAGScheduler( reason = "as part of cancellation of all jobs")) activeJobs.clear() // These should already be empty by this point, jobIdToActiveJob.clear() // but just in case we lost track of some jobs... + submitWaitingStages() } /** @@ -509,13 +510,14 @@ class DAGScheduler( submitStage(stage) } } + submitWaitingStages() } /** * Check for waiting or failed stages which are now eligible for resubmission. * Ordinarily run on every iteration of the event loop. */ - private[scheduler] def submitWaitingStages() { + private def submitWaitingStages() { // TODO: We might want to run this less often, when we are sure that something has become // runnable that wasn't before. logTrace("Checking for newly runnable parent stages") @@ -534,7 +536,7 @@ class DAGScheduler( * We run the operation in a separate thread just in case it takes a bunch of time, so that we * don't block the DAGScheduler event loop or other concurrent jobs. */ - protected[scheduler] def runLocally(job: ActiveJob) { + protected def runLocally(job: ActiveJob) { logInfo("Computing the requested partition locally") new Thread("Local computation of job " + job.jobId) { override def run() { @@ -593,6 +595,7 @@ class DAGScheduler( groupId == activeJob.properties.get(SparkContext.SPARK_JOB_GROUP_ID)) val jobIds = activeInGroup.map(_.jobId) jobIds.foreach(handleJobCancellation(_, "part of cancel job group")) + submitWaitingStages() } private[scheduler] def handleBeginEvent(task: Task[_], taskInfo: TaskInfo) { @@ -607,10 +610,12 @@ class DAGScheduler( } } listenerBus.post(SparkListenerTaskStart(task.stageId, taskInfo)) + submitWaitingStages() } private[scheduler] def handleTaskSetFailed(taskSet: TaskSet, reason: String) { stageIdToStage.get(taskSet.stageId).foreach {abortStage(_, reason) } + submitWaitingStages() } private[scheduler] def cleanUpAfterSchedulerStop() { @@ -630,6 +635,11 @@ class DAGScheduler( } } + private[scheduler] def handleGetTaskResult(taskInfo: TaskInfo) { + listenerBus.post(SparkListenerTaskGettingResult(taskInfo)) + submitWaitingStages() + } + private[scheduler] def handleJobSubmitted(jobId: Int, finalRDD: RDD[_], func: (TaskContext, Iterator[_]) => _, @@ -669,10 +679,11 @@ class DAGScheduler( submitStage(finalStage) } } + submitWaitingStages() } /** Submits stage, but first recursively submits any missing parents. */ - private[scheduler] def submitStage(stage: Stage) { + private def submitStage(stage: Stage) { val jobId = activeJobForStage(stage) if (jobId.isDefined) { logDebug("submitStage(" + stage + ")") @@ -908,6 +919,7 @@ class DAGScheduler( // Unrecognized failure - also do nothing. If the task fails repeatedly, the TaskScheduler // will abort the job. } + submitWaitingStages() } /** @@ -937,6 +949,7 @@ class DAGScheduler( logDebug("Additional executor lost message for " + execId + "(epoch " + currentEpoch + ")") } + submitWaitingStages() } private[scheduler] def handleExecutorAdded(execId: String, host: String) { @@ -945,6 +958,7 @@ class DAGScheduler( logInfo("Host added was in lost list earlier: " + host) failedEpoch -= execId } + submitWaitingStages() } private[scheduler] def handleStageCancellation(stageId: Int) { @@ -956,6 +970,7 @@ class DAGScheduler( } else { logInfo("No active jobs to kill for Stage " + stageId) } + submitWaitingStages() } private[scheduler] def handleJobCancellation(jobId: Int, reason: String = "") { @@ -965,6 +980,7 @@ class DAGScheduler( failJobAndIndependentStages(jobIdToActiveJob(jobId), "Job %d cancelled %s".format(jobId, reason), None) } + submitWaitingStages() } /** @@ -1162,8 +1178,8 @@ private[scheduler] class DAGSchedulerEventProcessActor(dagScheduler: DAGSchedule case BeginEvent(task, taskInfo) => dagScheduler.handleBeginEvent(task, taskInfo) - case GettingResultEvent(task, taskInfo) => - dagScheduler.listenerBus.post(SparkListenerTaskGettingResult(taskInfo)) + case GettingResultEvent(taskInfo) => + dagScheduler.handleGetTaskResult(taskInfo) case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) => dagScheduler.handleTaskCompletion(completion) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala index 1556b56920655..23f57441b4b11 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala @@ -57,7 +57,7 @@ private[scheduler] case class BeginEvent(task: Task[_], taskInfo: TaskInfo) extends DAGSchedulerEvent private[scheduler] -case class GettingResultEvent(task: Task[_], taskInfo: TaskInfo) extends DAGSchedulerEvent +case class GettingResultEvent(taskInfo: TaskInfo) extends DAGSchedulerEvent private[scheduler] case class CompletionEvent( task: Task[_], diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 86d2050a03f18..0ac5599483280 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -465,7 +465,7 @@ private[spark] class TaskSetManager( def handleTaskGettingResult(tid: Long) = { val info = taskInfos(tid) info.markGettingResult() - sched.dagScheduler.taskGettingResult(tasks(info.index), info) + sched.dagScheduler.taskGettingResult(info) } /** From 561cfbcc6847accb91be8f53e138067b2b5d958b Mon Sep 17 00:00:00 2001 From: CodingCat Date: Wed, 23 Apr 2014 17:43:12 -0400 Subject: [PATCH 12/19] recover doCheckpoint --- core/src/main/scala/org/apache/spark/SparkContext.scala | 1 + .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 253de6ef95b14..9cd191109b0c9 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1017,6 +1017,7 @@ class SparkContext(config: SparkConf) extends Logging { dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal, resultHandler, localProperties.get) logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s") + rdd.doCheckpoint() } /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 8307cef22a676..48d13943b75f4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -433,7 +433,7 @@ class DAGScheduler( { val waiter = submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties) waiter.awaitResult() match { - case JobSucceeded => rdd.doCheckpoint() + case JobSucceeded => case JobFailed(exception: Exception) => logInfo("Failed to run " + callSite) throw exception From cd02d9a12c27b25bdc97ba9a16830908868214d0 Mon Sep 17 00:00:00 2001 From: Nan Zhu Date: Wed, 23 Apr 2014 18:57:06 -0400 Subject: [PATCH 13/19] small fix --- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 48d13943b75f4..3f76b2c11f2e4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -433,7 +433,7 @@ class DAGScheduler( { val waiter = submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties) waiter.awaitResult() match { - case JobSucceeded => + case JobSucceeded => {} case JobFailed(exception: Exception) => logInfo("Failed to run " + callSite) throw exception From 310a579c7b7f31a141550d1b65f9ccde1669d26e Mon Sep 17 00:00:00 2001 From: CodingCat Date: Wed, 23 Apr 2014 20:04:38 -0400 Subject: [PATCH 14/19] style fix --- .../apache/spark/scheduler/DAGScheduler.scala | 22 ++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 3f76b2c11f2e4..6c4dbff140207 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -594,7 +594,7 @@ class DAGScheduler( val activeInGroup = activeJobs.filter(activeJob => groupId == activeJob.properties.get(SparkContext.SPARK_JOB_GROUP_ID)) val jobIds = activeInGroup.map(_.jobId) - jobIds.foreach(handleJobCancellation(_, "part of cancel job group")) + jobIds.foreach(handleJobCancellation(_, "part of cancelled job group %s".format(groupId))) submitWaitingStages() } @@ -641,13 +641,14 @@ class DAGScheduler( } private[scheduler] def handleJobSubmitted(jobId: Int, - finalRDD: RDD[_], - func: (TaskContext, Iterator[_]) => _, - partitions: Array[Int], - allowLocal: Boolean, - callSite: String, - listener: JobListener, - properties: Properties = null) { + finalRDD: RDD[_], + func: (TaskContext, Iterator[_]) => _, + partitions: Array[Int], + allowLocal: Boolean, + callSite: String, + listener: JobListener, + properties: Properties = null) + { var finalStage: Stage = null try { // New stage creation may throw an exception if, for example, jobs are run on a @@ -657,12 +658,13 @@ class DAGScheduler( case e: Exception => logWarning("Creating new stage failed due to exception - job: " + jobId, e) listener.jobFailed(e) + return } if (finalStage != null) { val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties) clearCacheLocs() - logInfo("Got job %s (%s) with %d output partitions (allowLocal=%s)". - format(job.jobId, callSite, partitions.length, allowLocal)) + logInfo("Got job %s (%s) with %d output partitions (allowLocal=%s)".format( + job.jobId, callSite, partitions.length, allowLocal)) logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")") logInfo("Parents of final stage: " + finalStage.parents) logInfo("Missing parents: " + getMissingParentStages(finalStage)) From 82d08b3b73bd38255029d95441da4f516f7683ec Mon Sep 17 00:00:00 2001 From: CodingCat Date: Wed, 23 Apr 2014 20:31:32 -0400 Subject: [PATCH 15/19] calling actorOf on system to ensure it is blocking --- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 6c4dbff140207..2737486028b65 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1134,11 +1134,11 @@ private[scheduler] class DAGSchedulerActorSupervisor(dagScheduler: DAGScheduler) } def receive = { - case p: Props => sender ! context.actorOf(p) + case p: Props => sender ! context.system.actorOf(p) case _ => logWarning("received unknown message in DAGSchedulerActorSupervisor") } - dagScheduler.eventProcessActor = context.actorOf( + dagScheduler.eventProcessActor = context.system.actorOf( Props(new DAGSchedulerEventProcessActor(dagScheduler))) } From 35c886a8e75421baeececa37af8f8cbd651dec2b Mon Sep 17 00:00:00 2001 From: CodingCat Date: Wed, 23 Apr 2014 23:00:01 -0400 Subject: [PATCH 16/19] fix bug --- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 2737486028b65..6c4dbff140207 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1134,11 +1134,11 @@ private[scheduler] class DAGSchedulerActorSupervisor(dagScheduler: DAGScheduler) } def receive = { - case p: Props => sender ! context.system.actorOf(p) + case p: Props => sender ! context.actorOf(p) case _ => logWarning("received unknown message in DAGSchedulerActorSupervisor") } - dagScheduler.eventProcessActor = context.system.actorOf( + dagScheduler.eventProcessActor = context.actorOf( Props(new DAGSchedulerEventProcessActor(dagScheduler))) } From baf2d38b4d1dcc742914ab8a383d76540ca1a120 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Thu, 24 Apr 2014 00:04:53 -0400 Subject: [PATCH 17/19] fix the issue brought by non-blocking actorOf --- .../apache/spark/scheduler/DAGScheduler.scala | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 6c4dbff140207..28a9e2117de1b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -74,8 +74,6 @@ class DAGScheduler( def this(sc: SparkContext) = this(sc, sc.taskScheduler) - private[scheduler] var eventProcessActor: ActorRef = _ - private[scheduler] val nextJobId = new AtomicInteger(0) private[scheduler] def numTotalJobs: Int = nextJobId.get() private val nextStageId = new AtomicInteger(0) @@ -113,8 +111,10 @@ class DAGScheduler( // stray messages to detect. private val failedEpoch = new HashMap[String, Long] - private val dagSchedulerActorSupervisor = - env.actorSystem.actorOf(Props(new DAGSchedulerActorSupervisor(this))) + private var dagSchedulerActorSupervisor: ActorRef = _ + private[scheduler] var eventProcessActor: ActorRef = _ + + startDAGSchedulerActors() // Called by TaskScheduler to report task's starting. def taskStarted(task: Task[_], taskInfo: TaskInfo) { @@ -289,6 +289,14 @@ class DAGScheduler( missing.toList } + private def startDAGSchedulerActors() { + dagSchedulerActorSupervisor = env.actorSystem.actorOf(Props( + new DAGSchedulerActorSupervisor(this))) + while (dagSchedulerActorSupervisor == null || eventProcessActor == null) { + Thread.sleep(1) + } + } + /** * Registers the given jobId among the jobs that need the given stage and * all of that stage's ancestors. From 124d82d3ac057a5a9d2e9063b731b965fe6ab6b8 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Thu, 24 Apr 2014 19:59:03 -0400 Subject: [PATCH 18/19] blocking the constructor until event actor is ready --- .../apache/spark/scheduler/DAGScheduler.scala | 37 ++++++++++++------- 1 file changed, 24 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 28a9e2117de1b..614d4f2d7a4dd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -22,12 +22,16 @@ import java.util.Properties import java.util.concurrent.atomic.AtomicInteger import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map} +import scala.concurrent.Await import scala.concurrent.duration._ +import scala.language.postfixOps import scala.reflect.ClassTag import akka.actor._ import akka.actor.OneForOneStrategy import akka.actor.SupervisorStrategy.Stop +import akka.pattern.ask +import akka.util.Timeout import org.apache.spark._ import org.apache.spark.executor.TaskMetrics @@ -111,10 +115,28 @@ class DAGScheduler( // stray messages to detect. private val failedEpoch = new HashMap[String, Long] - private var dagSchedulerActorSupervisor: ActorRef = _ + private val dagSchedulerActorSupervisor = + env.actorSystem.actorOf(Props(new DAGSchedulerActorSupervisor(this))) + private[scheduler] var eventProcessActor: ActorRef = _ - startDAGSchedulerActors() + private def initializeEventProcessActor() { + try { + // blocking the thread until supervisor is started, which ensures eventProcessActor is + // not null before any job is submitted + implicit val timeout = Timeout(30 seconds) + val initEventActorReply = + dagSchedulerActorSupervisor ? Props(new DAGSchedulerEventProcessActor(this)) + eventProcessActor = Await.result(initEventActorReply, timeout.duration). + asInstanceOf[ActorRef] + } catch { + case e: Exception => + logError("DAGSchedulerEventProcessActor cannot be initialized, stop SparkContext") + sc.stop() + } + } + + initializeEventProcessActor() // Called by TaskScheduler to report task's starting. def taskStarted(task: Task[_], taskInfo: TaskInfo) { @@ -289,14 +311,6 @@ class DAGScheduler( missing.toList } - private def startDAGSchedulerActors() { - dagSchedulerActorSupervisor = env.actorSystem.actorOf(Props( - new DAGSchedulerActorSupervisor(this))) - while (dagSchedulerActorSupervisor == null || eventProcessActor == null) { - Thread.sleep(1) - } - } - /** * Registers the given jobId among the jobs that need the given stage and * all of that stage's ancestors. @@ -1145,9 +1159,6 @@ private[scheduler] class DAGSchedulerActorSupervisor(dagScheduler: DAGScheduler) case p: Props => sender ! context.actorOf(p) case _ => logWarning("received unknown message in DAGSchedulerActorSupervisor") } - - dagScheduler.eventProcessActor = context.actorOf( - Props(new DAGSchedulerEventProcessActor(dagScheduler))) } private[scheduler] class DAGSchedulerEventProcessActor(dagScheduler: DAGScheduler) From a7fb0eec915131afc928e86ac2083800f3663561 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Thu, 24 Apr 2014 20:07:31 -0400 Subject: [PATCH 19/19] throw Exception on failure of creating DAG --- .../scala/org/apache/spark/SparkContext.scala | 9 ++++++++- .../apache/spark/scheduler/DAGScheduler.scala | 20 +++++++------------ 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 9cd191109b0c9..e7b98bca290fa 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -274,7 +274,14 @@ class SparkContext(config: SparkConf) extends Logging { // Create and start the scheduler private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master) - @volatile private[spark] var dagScheduler = new DAGScheduler(this) + @volatile private[spark] var dagScheduler: DAGScheduler = _ + try { + dagScheduler = new DAGScheduler(this) + } catch { + case e: Exception => throw + new SparkException("DAGScheduler cannot be initialized due to %s".format(e.getMessage)) + } + // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's // constructor taskScheduler.start() diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 614d4f2d7a4dd..ef923b244deb5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -121,19 +121,13 @@ class DAGScheduler( private[scheduler] var eventProcessActor: ActorRef = _ private def initializeEventProcessActor() { - try { - // blocking the thread until supervisor is started, which ensures eventProcessActor is - // not null before any job is submitted - implicit val timeout = Timeout(30 seconds) - val initEventActorReply = - dagSchedulerActorSupervisor ? Props(new DAGSchedulerEventProcessActor(this)) - eventProcessActor = Await.result(initEventActorReply, timeout.duration). - asInstanceOf[ActorRef] - } catch { - case e: Exception => - logError("DAGSchedulerEventProcessActor cannot be initialized, stop SparkContext") - sc.stop() - } + // blocking the thread until supervisor is started, which ensures eventProcessActor is + // not null before any job is submitted + implicit val timeout = Timeout(30 seconds) + val initEventActorReply = + dagSchedulerActorSupervisor ? Props(new DAGSchedulerEventProcessActor(this)) + eventProcessActor = Await.result(initEventActorReply, timeout.duration). + asInstanceOf[ActorRef] } initializeEventProcessActor()