Skip to content

Commit c88adbb

Browse files
aarondavpwendell
authored andcommitted
SPARK-1772 Stop catching Throwable, let Executors die
The main issue this patch fixes is [SPARK-1772](https://issues.apache.org/jira/browse/SPARK-1772), in which Executors may not die when fatal exceptions (e.g., OOM) are thrown. This patch causes Executors to delegate to the ExecutorUncaughtExceptionHandler when a fatal exception is thrown. This patch also continues the fight in the neverending war against `case t: Throwable =>`, by only catching Exceptions in many places, and adding a wrapper for Threads and Runnables to make sure any uncaught exceptions are at least printed to the logs. It also turns out that it is unlikely that the IndestructibleActorSystem actually works, given testing ([here](https://gist.github.com/aarondav/ca1f0cdcd50727f89c0d)). The uncaughtExceptionHandler is not called from the places that we expected it would be. [SPARK-1620](https://issues.apache.org/jira/browse/SPARK-1620) deals with part of this issue, but refactoring our Actor Systems to ensure that exceptions are dealt with properly is a much bigger change, outside the scope of this PR. Author: Aaron Davidson <[email protected]> Closes #715 from aarondav/throwable and squashes the following commits: f9b9bfe [Aaron Davidson] Remove other redundant 'throw e' e937a0a [Aaron Davidson] Address Prashant and Matei's comments 1867867 [Aaron Davidson] [RFC] SPARK-1772 Stop catching Throwable, let Executors die (cherry picked from commit 3af1f38) Signed-off-by: Patrick Wendell <[email protected]>
1 parent 19ccf20 commit c88adbb

19 files changed

+127
-140
lines changed

core/src/main/scala/org/apache/spark/ContextCleaner.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
2323

2424
import org.apache.spark.broadcast.Broadcast
2525
import org.apache.spark.rdd.RDD
26+
import org.apache.spark.util.Utils
2627

2728
/**
2829
* Classes that represent cleaning tasks.
@@ -110,7 +111,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
110111
}
111112

112113
/** Keep cleaning RDD, shuffle, and broadcast state. */
113-
private def keepCleaning() {
114+
private def keepCleaning(): Unit = Utils.logUncaughtExceptions {
114115
while (!stopped) {
115116
try {
116117
val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
@@ -128,7 +129,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
128129
}
129130
}
130131
} catch {
131-
case t: Throwable => logError("Error in cleaning thread", t)
132+
case e: Exception => logError("Error in cleaning thread", e)
132133
}
133134
}
134135
}
@@ -141,7 +142,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
141142
listeners.foreach(_.rddCleaned(rddId))
142143
logInfo("Cleaned RDD " + rddId)
143144
} catch {
144-
case t: Throwable => logError("Error cleaning RDD " + rddId, t)
145+
case e: Exception => logError("Error cleaning RDD " + rddId, e)
145146
}
146147
}
147148

@@ -154,7 +155,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
154155
listeners.foreach(_.shuffleCleaned(shuffleId))
155156
logInfo("Cleaned shuffle " + shuffleId)
156157
} catch {
157-
case t: Throwable => logError("Error cleaning shuffle " + shuffleId, t)
158+
case e: Exception => logError("Error cleaning shuffle " + shuffleId, e)
158159
}
159160
}
160161

@@ -166,7 +167,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
166167
listeners.foreach(_.broadcastCleaned(broadcastId))
167168
logInfo("Cleaned broadcast " + broadcastId)
168169
} catch {
169-
case t: Throwable => logError("Error cleaning broadcast " + broadcastId, t)
170+
case e: Exception => logError("Error cleaning broadcast " + broadcastId, e)
170171
}
171172
}
172173

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1494,8 +1494,8 @@ object SparkContext extends Logging {
14941494
} catch {
14951495
// TODO: Enumerate the exact reasons why it can fail
14961496
// But irrespective of it, it means we cannot proceed !
1497-
case th: Throwable => {
1498-
throw new SparkException("YARN mode not available ?", th)
1497+
case e: Exception => {
1498+
throw new SparkException("YARN mode not available ?", e)
14991499
}
15001500
}
15011501
val backend = new CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
@@ -1510,8 +1510,8 @@ object SparkContext extends Logging {
15101510
cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]
15111511

15121512
} catch {
1513-
case th: Throwable => {
1514-
throw new SparkException("YARN mode not available ?", th)
1513+
case e: Exception => {
1514+
throw new SparkException("YARN mode not available ?", e)
15151515
}
15161516
}
15171517

@@ -1521,8 +1521,8 @@ object SparkContext extends Logging {
15211521
val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
15221522
cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
15231523
} catch {
1524-
case th: Throwable => {
1525-
throw new SparkException("YARN mode not available ?", th)
1524+
case e: Exception => {
1525+
throw new SparkException("YARN mode not available ?", e)
15261526
}
15271527
}
15281528

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ private[spark] class PythonRDD[T: ClassTag](
171171
this.interrupt()
172172
}
173173

174-
override def run() {
174+
override def run(): Unit = Utils.logUncaughtExceptions {
175175
try {
176176
SparkEnv.set(env)
177177
val stream = new BufferedOutputStream(worker.getOutputStream, bufferSize)
@@ -282,7 +282,6 @@ private[spark] object PythonRDD {
282282
}
283283
} catch {
284284
case eof: EOFException => {}
285-
case e: Throwable => throw e
286285
}
287286
JavaRDD.fromRDD(sc.sc.parallelize(objs, parallelism))
288287
}

core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
7171
stopDaemon()
7272
startDaemon()
7373
new Socket(daemonHost, daemonPort)
74-
case e: Throwable => throw e
7574
}
7675
}
7776
}

core/src/main/scala/org/apache/spark/deploy/Client.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ object Client {
157157
// TODO: See if we can initialize akka so return messages are sent back using the same TCP
158158
// flow. Else, this (sadly) requires the DriverClient be routable from the Master.
159159
val (actorSystem, _) = AkkaUtils.createActorSystem(
160-
"driverClient", Utils.localHostName(), 0, false, conf, new SecurityManager(conf))
160+
"driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))
161161

162162
actorSystem.actorOf(Props(classOf[ClientActor], driverArgs, conf))
163163

core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ object SparkHadoopUtil {
103103
.newInstance()
104104
.asInstanceOf[SparkHadoopUtil]
105105
} catch {
106-
case th: Throwable => throw new SparkException("Unable to load YARN support", th)
106+
case e: Exception => throw new SparkException("Unable to load YARN support", e)
107107
}
108108
} else {
109109
new SparkHadoopUtil

core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ class HistoryServer(
7070
* TODO: Add a mechanism to update manually.
7171
*/
7272
private val logCheckingThread = new Thread {
73-
override def run() {
73+
override def run(): Unit = Utils.logUncaughtExceptions {
7474
while (!stopped) {
7575
val now = System.currentTimeMillis
7676
if (now - lastLogCheckTime > UPDATE_INTERVAL_MS) {
@@ -154,7 +154,7 @@ class HistoryServer(
154154
numCompletedApplications = logInfos.size
155155

156156
} catch {
157-
case t: Throwable => logError("Exception in checking for event log updates", t)
157+
case e: Exception => logError("Exception in checking for event log updates", e)
158158
}
159159
} else {
160160
logWarning("Attempted to check for event log updates before binding the server.")
@@ -231,8 +231,8 @@ class HistoryServer(
231231
dir.getModificationTime
232232
}
233233
} catch {
234-
case t: Throwable =>
235-
logError("Exception in accessing modification time of %s".format(dir.getPath), t)
234+
case e: Exception =>
235+
logError("Exception in accessing modification time of %s".format(dir.getPath), e)
236236
-1L
237237
}
238238
}

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -684,8 +684,8 @@ private[spark] class Master(
684684
webUi.attachSparkUI(ui)
685685
return true
686686
} catch {
687-
case t: Throwable =>
688-
logError("Exception in replaying log for application %s (%s)".format(appName, app.id), t)
687+
case e: Exception =>
688+
logError("Exception in replaying log for application %s (%s)".format(appName, app.id), e)
689689
}
690690
} else {
691691
logWarning("Application %s (%s) has no valid logs: %s".format(appName, app.id, eventLogDir))

core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ object DriverWrapper {
3131
case workerUrl :: mainClass :: extraArgs =>
3232
val conf = new SparkConf()
3333
val (actorSystem, _) = AkkaUtils.createActorSystem("Driver",
34-
Utils.localHostName(), 0, false, conf, new SecurityManager(conf))
34+
Utils.localHostName(), 0, conf, new SecurityManager(conf))
3535
actorSystem.actorOf(Props(classOf[WorkerWatcher], workerUrl), name = "workerWatcher")
3636

3737
// Delegate to supplied main class

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ private[spark] object CoarseGrainedExecutorBackend {
105105
// Create a new ActorSystem to run the backend, because we can't create a
106106
// SparkEnv / Executor before getting started with all our system properties, etc
107107
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0,
108-
indestructible = true, conf = conf, new SecurityManager(conf))
108+
conf, new SecurityManager(conf))
109109
// set it
110110
val sparkHostPort = hostname + ":" + boundPort
111111
actorSystem.actorOf(

0 commit comments

Comments
 (0)