Skip to content

Commit 8d271c9

Browse files
zhpenggrxin
authored andcommitted
SPARK-1929 DAGScheduler suspended by local task OOM
DAGScheduler does not handle local task OOM properly, and will wait for the job result forever. Author: Zhen Peng <[email protected]> Closes #883 from zhpengg/bugfix-dag-scheduler-oom and squashes the following commits: 76f7eda [Zhen Peng] remove redundant memory allocations aa63161 [Zhen Peng] SPARK-1929 DAGScheduler suspended by local task OOM
1 parent 56c771c commit 8d271c9

File tree

2 files changed

+19
-1
lines changed

2 files changed

+19
-1
lines changed

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.scheduler
1919

20-
import java.io.NotSerializableException
20+
import java.io.{NotSerializableException, PrintWriter, StringWriter}
2121
import java.util.Properties
2222
import java.util.concurrent.atomic.AtomicInteger
2323

@@ -580,6 +580,10 @@ class DAGScheduler(
580580
case e: Exception =>
581581
jobResult = JobFailed(e)
582582
job.listener.jobFailed(e)
583+
case oom: OutOfMemoryError =>
584+
val exception = new SparkException("job failed for Out of memory exception", oom)
585+
jobResult = JobFailed(exception)
586+
job.listener.jobFailed(exception)
583587
} finally {
584588
val s = job.finalStage
585589
stageIdToJobIds -= s.id // clean up data structures that were populated for a local job,

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,20 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
256256
assertDataStructuresEmpty
257257
}
258258

259+
test("local job oom") {
260+
val rdd = new MyRDD(sc, Nil) {
261+
override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
262+
throw new java.lang.OutOfMemoryError("test local job oom")
263+
override def getPartitions = Array( new Partition { override def index = 0 } )
264+
override def getPreferredLocations(split: Partition) = Nil
265+
override def toString = "DAGSchedulerSuite Local RDD"
266+
}
267+
val jobId = scheduler.nextJobId.getAndIncrement()
268+
runEvent(JobSubmitted(jobId, rdd, jobComputeFunc, Array(0), true, null, jobListener))
269+
assert(results.size == 0)
270+
assertDataStructuresEmpty
271+
}
272+
259273
test("run trivial job w/ dependency") {
260274
val baseRdd = makeRdd(1, Nil)
261275
val finalRdd = makeRdd(1, List(new OneToOneDependency(baseRdd)))

0 commit comments

Comments
 (0)