@@ -300,10 +300,17 @@ class SparkContext(config: SparkConf) extends Logging {
300300
301301 // Create and start the scheduler
302302 private [spark] var taskScheduler = SparkContext .createTaskScheduler(this , master)
303- taskScheduler.start()
303+ @ volatile private [spark] var dagScheduler : DAGScheduler = _
304+ try {
305+ dagScheduler = new DAGScheduler (this )
306+ } catch {
307+ case e : Exception => throw
308+ new SparkException (" DAGScheduler cannot be initialized due to %s" .format(e.getMessage))
309+ }
304310
305- @ volatile private [spark] var dagScheduler = new DAGScheduler (this )
306- dagScheduler.start()
311+ // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
312+ // constructor
313+ taskScheduler.start()
307314
308315 private [spark] val cleaner : Option [ContextCleaner ] = {
309316 if (conf.getBoolean(" spark.cleaner.referenceTracking" , true )) {
@@ -1022,8 +1029,8 @@ class SparkContext(config: SparkConf) extends Logging {
10221029 partitions : Seq [Int ],
10231030 allowLocal : Boolean ,
10241031 resultHandler : (Int , U ) => Unit ) {
1025- partitions.foreach{ p =>
1026- require(p >= 0 && p < rdd.partitions.size, s " Invalid partition requested: $p " )
1032+ if (dagScheduler == null ) {
1033+ throw new SparkException ( " SparkContext has been shutdown " )
10271034 }
10281035 val callSite = getCallSite
10291036 val cleanedFunc = clean(func)
@@ -1132,9 +1139,6 @@ class SparkContext(config: SparkConf) extends Logging {
11321139 resultHandler : (Int , U ) => Unit ,
11331140 resultFunc : => R ): SimpleFutureAction [R ] =
11341141 {
1135- partitions.foreach{ p =>
1136- require(p >= 0 && p < rdd.partitions.size, s " Invalid partition requested: $p" )
1137- }
11381142 val cleanF = clean(processPartition)
11391143 val callSite = getCallSite
11401144 val waiter = dagScheduler.submitJob(
0 commit comments