Skip to content

Commit aa63161

Browse files
committed
SPARK-1929 DAGScheduler suspended by local task OOM
1 parent d6395d8 commit aa63161

File tree

2 files changed

+22
-1
lines changed

2 files changed

+22
-1
lines changed

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.scheduler
1919

20-
import java.io.NotSerializableException
20+
import java.io.{NotSerializableException, PrintWriter, StringWriter}
2121
import java.util.Properties
2222
import java.util.concurrent.atomic.AtomicInteger
2323

@@ -580,6 +580,13 @@ class DAGScheduler(
580580
case e: Exception =>
581581
jobResult = JobFailed(e)
582582
job.listener.jobFailed(e)
583+
case oom: OutOfMemoryError =>
584+
val errors: StringWriter = new StringWriter()
585+
oom.printStackTrace(new PrintWriter(errors))
586+
val exception = new SparkException("job failed for: " + oom.getMessage() +
587+
"\n" + errors.toString())
588+
jobResult = JobFailed(exception)
589+
job.listener.jobFailed(exception)
583590
} finally {
584591
val s = job.finalStage
585592
stageIdToJobIds -= s.id // clean up data structures that were populated for a local job,

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,20 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
256256
assertDataStructuresEmpty
257257
}
258258

259+
test("local job oom") {
260+
val rdd = new MyRDD(sc, Nil) {
261+
override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
262+
throw new java.lang.OutOfMemoryError("test local job oom")
263+
override def getPartitions = Array( new Partition { override def index = 0 } )
264+
override def getPreferredLocations(split: Partition) = Nil
265+
override def toString = "DAGSchedulerSuite Local RDD"
266+
}
267+
val jobId = scheduler.nextJobId.getAndIncrement()
268+
runEvent(JobSubmitted(jobId, rdd, jobComputeFunc, Array(0), true, null, jobListener))
269+
assert(results.size == 0)
270+
assertDataStructuresEmpty
271+
}
272+
259273
test("run trivial job w/ dependency") {
260274
val baseRdd = makeRdd(1, Nil)
261275
val finalRdd = makeRdd(1, List(new OneToOneDependency(baseRdd)))

0 commit comments

Comments
 (0)